Update task state and result. if always_retry_backend_operation is activated, in the event of a recoverable exception, then retry operation with an exponential backoff until a limit has been reached.
(self, task_id, result, state,
traceback=None, request=None, **kwargs)
| 610 | time.sleep(amount) |
| 611 | |
| 612 | def store_result(self, task_id, result, state, |
| 613 | traceback=None, request=None, **kwargs): |
| 614 | """Update task state and result. |
| 615 | |
| 616 | if always_retry_backend_operation is activated, in the event of a recoverable exception, |
| 617 | then retry operation with an exponential backoff until a limit has been reached. |
| 618 | """ |
| 619 | result = self.encode_result(result, state) |
| 620 | |
| 621 | retries = 0 |
| 622 | |
| 623 | while True: |
| 624 | try: |
| 625 | self._store_result(task_id, result, state, traceback, |
| 626 | request=request, **kwargs) |
| 627 | return result |
| 628 | except Exception as exc: |
| 629 | if self.always_retry and self.exception_safe_to_retry(exc): |
| 630 | if retries < self.max_retries: |
| 631 | retries += 1 |
| 632 | try: |
| 633 | self.on_backend_retryable_error(exc) |
| 634 | except Exception: |
| 635 | logger.exception( |
| 636 | "on_backend_retryable_error hook failed; continuing retry loop", |
| 637 | ) |
| 638 | |
| 639 | # get_exponential_backoff_interval computes integers |
| 640 | # and time.sleep accept floats for sub second sleep |
| 641 | sleep_amount = get_exponential_backoff_interval( |
| 642 | self.base_sleep_between_retries_ms, retries, |
| 643 | self.max_sleep_between_retries_ms, True) / 1000 |
| 644 | self._sleep(sleep_amount) |
| 645 | else: |
| 646 | raise_with_context( |
| 647 | BackendStoreError("failed to store result on the backend", task_id=task_id, state=state), |
| 648 | ) |
| 649 | else: |
| 650 | raise |
| 651 | |
| 652 | def forget(self, task_id): |
| 653 | self._cache.pop(task_id, None) |
no test coverage detected