MCPcopy
hub / github.com/celery/celery / _get_task_meta_for

Method _get_task_meta_for

celery/backends/database/__init__.py:187–202  ·  view source on GitHub ↗

Get task meta-data for a task by id.

(self, task_id)

Source from the content-addressed store, hash-verified

185
186 @retry
187 def _get_task_meta_for(self, task_id):
188 """Get task meta-data for a task by id."""
189 session = self.ResultSession()
190 with session_cleanup(session):
191 task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
192 task = task and task[0]
193 if not task:
194 task = self.task_cls(task_id)
195 task.status = states.PENDING
196 task.result = None
197 data = task.to_dict()
198 if data.get('args', None) is not None:
199 data['args'] = self.decode(data['args'])
200 if data.get('kwargs', None) is not None:
201 data['kwargs'] = self.decode(data['kwargs'])
202 return self.meta_from_decoded(data)
203
204 @retry
205 def _save_group(self, group_id, result):

Callers

nothing calls this directly

Calls 7

ResultSessionMethod · 0.95
session_cleanupFunction · 0.85
filterMethod · 0.80
meta_from_decodedMethod · 0.80
to_dictMethod · 0.45
getMethod · 0.45
decodeMethod · 0.45

Tested by

no test coverage detected