MCPcopy
hub / github.com/celery/celery / test_start

Method test_start

t/unit/worker/test_consumer.py:1029–1065  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1027 mingle.start(c)
1028
1029 def test_start(self):
1030 c = Mock()
1031 c.app.connection_for_read = _amqp_connection()
1032 mingle = Mingle(c)
1033 assert mingle.enabled
1034
1035 Aig = LimitedSet()
1036 Big = LimitedSet()
1037 Aig.add('Aig-1')
1038 Aig.add('Aig-2')
1039 Big.add('Big-1')
1040
1041 I = c.app.control.inspect.return_value = Mock()
1042 I.hello.return_value = {
1043 'A@example.com': {
1044 'clock': 312,
1045 'revoked': Aig._data,
1046 },
1047 'B@example.com': {
1048 'clock': 29,
1049 'revoked': Big._data,
1050 },
1051 'C@example.com': {
1052 'error': 'unknown method',
1053 },
1054 }
1055
1056 our_revoked = c.controller.state.revoked = LimitedSet()
1057
1058 mingle.start(c)
1059 I.hello.assert_called_with(c.hostname, our_revoked._data)
1060 c.app.clock.adjust.assert_has_calls([
1061 call(312), call(29),
1062 ], any_order=True)
1063 assert 'Aig-1' in our_revoked
1064 assert 'Aig-2' in our_revoked
1065 assert 'Big-1' in our_revoked
1066
1067
1068def _amqp_connection():

Callers

nothing calls this directly

Calls 6

addMethod · 0.95
startMethod · 0.95
MingleClass · 0.90
LimitedSetClass · 0.90
_amqp_connectionFunction · 0.85
callFunction · 0.85

Tested by

no test coverage detected