MCPcopy
hub / github.com/celery/celery / test_callbacks

Method test_callbacks

t/unit/worker/test_strategy.py:168–174  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

166 assert C.event_sent()
167
168 def test_callbacks(self):
169 with self._context(self.add.s(2, 2)) as C:
170 callbacks = [Mock(name='cb1'), Mock(name='cb2')]
171 C(callbacks=callbacks)
172 req = C.get_request()
173 for callback in callbacks:
174 callback.assert_called_with(req)
175
176 def test_log_task_received(self, caplog):
177 caplog.set_level(logging.INFO, logger="celery.worker.strategy")

Callers

nothing calls this directly

Calls 4

_contextMethod · 0.95
CClass · 0.70
sMethod · 0.45
get_requestMethod · 0.45

Tested by

no test coverage detected