MCPcopy
hub / github.com/celery/celery / test_on_failure_Terminated

Method test_on_failure_Terminated

t/unit/worker/test_request.py:276–288  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

274 req.on_ack.assert_called_with(req_logger, req.connection_errors)
275
276 def test_on_failure_Terminated(self):
277 einfo = None
278 try:
279 raise Terminated('9')
280 except Terminated:
281 einfo = ExceptionInfo()
282 assert einfo is not None
283 req = self.get_request(self.add.s(2, 2))
284 req.on_failure(einfo)
285 req.eventer.send.assert_called_with(
286 'task-revoked',
287 uuid=req.id, terminated=True, signum='9', expired=False,
288 )
289
290 def test_on_failure_propagates_MemoryError(self):
291 einfo = None

Callers

nothing calls this directly

Calls 3

get_requestMethod · 0.95
sMethod · 0.45
on_failureMethod · 0.45

Tested by

no test coverage detected