MCPcopy
hub / github.com/celery/celery / test_collects_at_restart

Method test_collects_at_restart

t/unit/worker/test_consumer.py:367–373  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

365 c.start()
366
367 def test_collects_at_restart(self):
368 c = self.get_consumer()
369 c.connection.collect.side_effect = MemoryError()
370 c.blueprint.start.side_effect = socket.error()
371 c.blueprint.restart.side_effect = self._closer(c)
372 c.start()
373 c.connection.collect.assert_called_with()
374
375 def test_register_with_event_loop(self):
376 c = self.get_consumer()

Callers

nothing calls this directly

Calls 4

_closerMethod · 0.95
get_consumerMethod · 0.45
errorMethod · 0.45
startMethod · 0.45

Tested by

no test coverage detected