Get task meta from backend. if always_retry_backend_operation is activated, in the event of a recoverable exception, then retry operation with an exponential backoff until a limit has been reached.
(self, task_id, cache=True)
| 700 | return None |
| 701 | |
| 702 | def get_task_meta(self, task_id, cache=True): |
| 703 | """Get task meta from backend. |
| 704 | |
| 705 | if always_retry_backend_operation is activated, in the event of a recoverable exception, |
| 706 | then retry operation with an exponential backoff until a limit has been reached. |
| 707 | """ |
| 708 | self._ensure_not_eager() |
| 709 | if cache: |
| 710 | try: |
| 711 | return self._cache[task_id] |
| 712 | except KeyError: |
| 713 | pass |
| 714 | retries = 0 |
| 715 | while True: |
| 716 | try: |
| 717 | meta = self._get_task_meta_for(task_id) |
| 718 | break |
| 719 | except Exception as exc: |
| 720 | if self.always_retry and self.exception_safe_to_retry(exc): |
| 721 | if retries < self.max_retries: |
| 722 | retries += 1 |
| 723 | try: |
| 724 | self.on_backend_retryable_error(exc) |
| 725 | except Exception: |
| 726 | logger.exception( |
| 727 | "on_backend_retryable_error hook failed; continuing retry loop", |
| 728 | ) |
| 729 | |
| 730 | # get_exponential_backoff_interval computes integers |
| 731 | # and time.sleep accept floats for sub second sleep |
| 732 | sleep_amount = get_exponential_backoff_interval( |
| 733 | self.base_sleep_between_retries_ms, retries, |
| 734 | self.max_sleep_between_retries_ms, True) / 1000 |
| 735 | self._sleep(sleep_amount) |
| 736 | else: |
| 737 | raise_with_context( |
| 738 | BackendGetMetaError("failed to get meta", task_id=task_id), |
| 739 | ) |
| 740 | else: |
| 741 | raise |
| 742 | |
| 743 | if cache and meta.get('status') == states.SUCCESS: |
| 744 | self._cache[task_id] = meta |
| 745 | return meta |
| 746 | |
| 747 | def reload_task_result(self, task_id): |
| 748 | """Reload task result, even if it has been previously fetched.""" |