MCPcopy
hub / github.com/celery/celery / _store_result

Method _store_result

celery/backends/base.py:1078–1100  ·  view source on GitHub ↗
(self, task_id, result, state,
                      traceback=None, request=None, **kwargs)

Source from the content-addressed store, hash-verified

1076 self.delete(self.get_key_for_task(task_id))
1077
1078 def _store_result(self, task_id, result, state,
1079 traceback=None, request=None, **kwargs):
1080 meta = self._get_result_meta(result=result, state=state,
1081 traceback=traceback, request=request)
1082 meta['task_id'] = bytes_to_str(task_id)
1083
1084 # Retrieve metadata from the backend, if the status
1085 # is a success then we ignore any following update to the state.
1086 # This solves a task deduplication issue because of network
1087 # partitioning or lost workers. This issue involved a race condition
1088 # making a lost task overwrite the last successful result in the
1089 # result backend.
1090 current_meta = self._get_task_meta_for(task_id)
1091
1092 if current_meta['status'] == states.SUCCESS:
1093 return result
1094
1095 try:
1096 self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
1097 except BackendStoreError as ex:
1098 raise BackendStoreError(str(ex), state=state, task_id=task_id) from ex
1099
1100 return result
1101
1102 def _save_group(self, group_id, result):
1103 self._set_with_state(self.get_key_for_group(group_id),

Callers 1

store_resultMethod · 0.45

Calls 6

_get_task_meta_forMethod · 0.95
_set_with_stateMethod · 0.95
get_key_for_taskMethod · 0.95
BackendStoreErrorClass · 0.90
_get_result_metaMethod · 0.80
encodeMethod · 0.45

Tested by

no test coverage detected