MCPcopy
hub / github.com/celery/celery / test_max_restarts_exceeded

Method test_max_restarts_exceeded

t/unit/worker/test_consumer.py:266–278  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

264 task_reserved.assert_called_with(bucket)
265
266 def test_max_restarts_exceeded(self):
267 c = self.get_consumer()
268
269 def se(*args, **kwargs):
270 c.blueprint.state = CLOSE
271 raise RestartFreqExceeded()
272
273 c._restart_state.step.side_effect = se
274 c.blueprint.start.side_effect = socket.error()
275
276 with patch('celery.worker.consumer.consumer.sleep') as sleep:
277 c.start()
278 sleep.assert_called_with(1)
279
280 def test_do_not_restart_when_closed(self):
281 c = self.get_consumer()

Callers

nothing calls this directly

Calls 3

get_consumerMethod · 0.45
errorMethod · 0.45
startMethod · 0.45

Tested by

no test coverage detected