Handler called if the task raised an exception.
(self, exc_info, send_failed_event=True, return_ok=False)
| 574 | traceback=safe_str(exc_info.traceback)) |
| 575 | |
| 576 | def on_failure(self, exc_info, send_failed_event=True, return_ok=False): |
| 577 | """Handler called if the task raised an exception.""" |
| 578 | task_ready(self) |
| 579 | exc = exc_info.exception |
| 580 | |
| 581 | if isinstance(exc, ExceptionWithTraceback): |
| 582 | exc = exc.exc |
| 583 | |
| 584 | is_terminated = isinstance(exc, Terminated) |
| 585 | if is_terminated: |
| 586 | # If the task was terminated and the task was not cancelled due |
| 587 | # to a connection loss, it is revoked. |
| 588 | |
| 589 | # We always cancel the tasks inside the master process. |
| 590 | # If the request was cancelled, it was not revoked and there's |
| 591 | # nothing to be done. |
| 592 | # According to the comment below, we need to check if the task |
| 593 | # is already revoked and if it wasn't, we should announce that |
| 594 | # it was. |
| 595 | if not self._already_cancelled and not self._already_revoked: |
| 596 | # This is a special case where the process |
| 597 | # would not have had time to write the result. |
| 598 | self._announce_revoked( |
| 599 | 'terminated', True, str(exc), False) |
| 600 | return |
| 601 | elif isinstance(exc, MemoryError): |
| 602 | raise MemoryError(f'Process got: {exc}') |
| 603 | elif isinstance(exc, Reject): |
| 604 | return self.reject(requeue=exc.requeue) |
| 605 | elif isinstance(exc, Ignore): |
| 606 | return self.acknowledge() |
| 607 | elif isinstance(exc, Retry): |
| 608 | return self.on_retry(exc_info) |
| 609 | |
| 610 | # (acks_late) acknowledge after result stored. |
| 611 | requeue = False |
| 612 | is_worker_lost = isinstance(exc, WorkerLostError) |
| 613 | if self.task.acks_late: |
| 614 | reject = ( |
| 615 | (self.task.reject_on_worker_lost and is_worker_lost) |
| 616 | or (isinstance(exc, TimeLimitExceeded) and not self.task.acks_on_failure_or_timeout) |
| 617 | ) |
| 618 | ack = self.task.acks_on_failure_or_timeout |
| 619 | if reject: |
| 620 | requeue = True |
| 621 | self.reject(requeue=requeue) |
| 622 | send_failed_event = False |
| 623 | elif ack: |
| 624 | self.acknowledge() |
| 625 | else: |
| 626 | # supporting the behaviour where a task failed and |
| 627 | # need to be removed from prefetched local queue |
| 628 | self.reject(requeue=False) |
| 629 | |
| 630 | # This is a special case where the task failure handling is done during |
| 631 | # the cold shutdown process. |
| 632 | if state.should_terminate: |
| 633 | return_ok = True |
no test coverage detected