MCPcopy
hub / github.com/celery/celery / test_on_retry

Method test_on_retry

t/unit/worker/test_request.py:529–549  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

527 job.eventer.send.assert_not_called()
528
529 def test_on_retry(self):
530 job = self.get_request(self.mytask.s(1, f='x'))
531 job.eventer = Mock(name='.eventer')
532 try:
533 raise Retry('foo', KeyError('moofoobar'))
534 except Retry:
535 einfo = ExceptionInfo()
536 job.on_failure(einfo)
537 job.eventer.send.assert_called_with(
538 'task-retried',
539 uuid=job.id,
540 exception=safe_repr(einfo.exception.exc),
541 traceback=safe_str(einfo.traceback),
542 )
543 prev, module._does_info = module._does_info, False
544 try:
545 job.on_failure(einfo)
546 finally:
547 module._does_info = prev
548 einfo.internal = True
549 job.on_failure(einfo)
550
551 def test_compat_properties(self):
552 job = self.xRequest()

Callers

nothing calls this directly

Calls 4

get_requestMethod · 0.95
RetryClass · 0.90
sMethod · 0.45
on_failureMethod · 0.45

Tested by

no test coverage detected