(self, task_id, result, state,
traceback=None, request=None, **kwargs)
| 1076 | self.delete(self.get_key_for_task(task_id)) |
| 1077 | |
| 1078 | def _store_result(self, task_id, result, state, |
| 1079 | traceback=None, request=None, **kwargs): |
| 1080 | meta = self._get_result_meta(result=result, state=state, |
| 1081 | traceback=traceback, request=request) |
| 1082 | meta['task_id'] = bytes_to_str(task_id) |
| 1083 | |
| 1084 | # Retrieve metadata from the backend, if the status |
| 1085 | # is a success then we ignore any following update to the state. |
| 1086 | # This solves a task deduplication issue because of network |
| 1087 | # partitioning or lost workers. This issue involved a race condition |
| 1088 | # making a lost task overwrite the last successful result in the |
| 1089 | # result backend. |
| 1090 | current_meta = self._get_task_meta_for(task_id) |
| 1091 | |
| 1092 | if current_meta['status'] == states.SUCCESS: |
| 1093 | return result |
| 1094 | |
| 1095 | try: |
| 1096 | self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state) |
| 1097 | except BackendStoreError as ex: |
| 1098 | raise BackendStoreError(str(ex), state=state, task_id=task_id) from ex |
| 1099 | |
| 1100 | return result |
| 1101 | |
| 1102 | def _save_group(self, group_id, result): |
| 1103 | self._set_with_state(self.get_key_for_group(group_id), |
no test coverage detected