MCPcopy
hub / github.com/celery/celery / test_setup_heartbeat

Method test_setup_heartbeat

t/unit/worker/test_loops.py:153–163  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

151 asynloop(*x.args)
152
153 def test_setup_heartbeat(self):
154 x = X(self.app, heartbeat=10)
155 x.hub.timer.call_repeatedly = Mock(name='x.hub.call_repeatedly()')
156 x.blueprint.state = CLOSE
157 asynloop(*x.args)
158 x.consumer.consume.assert_called_with()
159 x.obj.on_ready.assert_called_with()
160 last_call_args, _ = x.hub.timer.call_repeatedly.call_args
161
162 assert last_call_args[0] == 10 / 2.0
163 assert last_call_args[2] == (2.0,)
164
165 def task_context(self, sig, **kwargs):
166 x, on_task = get_task_callback(self.app, **kwargs)

Callers

nothing calls this directly

Calls 2

asynloopFunction · 0.90
XClass · 0.70

Tested by

no test coverage detected