MCPcopy
hub / github.com/celery/celery / on_failure

Method on_failure

celery/worker/request.py:576–660  ·  view source on GitHub ↗

Handler called if the task raised an exception.

(self, exc_info, send_failed_event=True, return_ok=False)

Source from the content-addressed store, hash-verified

574 traceback=safe_str(exc_info.traceback))
575
576 def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
577 """Handler called if the task raised an exception."""
578 task_ready(self)
579 exc = exc_info.exception
580
581 if isinstance(exc, ExceptionWithTraceback):
582 exc = exc.exc
583
584 is_terminated = isinstance(exc, Terminated)
585 if is_terminated:
586 # If the task was terminated and the task was not cancelled due
587 # to a connection loss, it is revoked.
588
589 # We always cancel the tasks inside the master process.
590 # If the request was cancelled, it was not revoked and there's
591 # nothing to be done.
592 # According to the comment below, we need to check if the task
593 # is already revoked and if it wasn't, we should announce that
594 # it was.
595 if not self._already_cancelled and not self._already_revoked:
596 # This is a special case where the process
597 # would not have had time to write the result.
598 self._announce_revoked(
599 'terminated', True, str(exc), False)
600 return
601 elif isinstance(exc, MemoryError):
602 raise MemoryError(f'Process got: {exc}')
603 elif isinstance(exc, Reject):
604 return self.reject(requeue=exc.requeue)
605 elif isinstance(exc, Ignore):
606 return self.acknowledge()
607 elif isinstance(exc, Retry):
608 return self.on_retry(exc_info)
609
610 # (acks_late) acknowledge after result stored.
611 requeue = False
612 is_worker_lost = isinstance(exc, WorkerLostError)
613 if self.task.acks_late:
614 reject = (
615 (self.task.reject_on_worker_lost and is_worker_lost)
616 or (isinstance(exc, TimeLimitExceeded) and not self.task.acks_on_failure_or_timeout)
617 )
618 ack = self.task.acks_on_failure_or_timeout
619 if reject:
620 requeue = True
621 self.reject(requeue=requeue)
622 send_failed_event = False
623 elif ack:
624 self.acknowledge()
625 else:
626 # supporting the behaviour where a task failed and
627 # need to be removed from prefetched local queue
628 self.reject(requeue=False)
629
630 # This is a special case where the task failure handling is done during
631 # the cold shutdown process.
632 if state.should_terminate:
633 return_ok = True

Callers 1

on_successMethod · 0.95

Calls 9

_announce_revokedMethod · 0.95
rejectMethod · 0.95
acknowledgeMethod · 0.95
on_retryMethod · 0.95
send_eventMethod · 0.95
get_pickled_exceptionFunction · 0.90
task_readyFunction · 0.85
mark_as_failureMethod · 0.80
sendMethod · 0.45

Tested by

no test coverage detected