MCPcopy
hub / github.com/celery/celery / get_task_meta

Method get_task_meta

celery/backends/rpc.py:291–321  ·  view source on GitHub ↗
(self, task_id, backlog_limit=1000)

Source from the content-addressed store, hash-verified

289 self._out_of_band[task_id] = message
290
291 def get_task_meta(self, task_id, backlog_limit=1000):
292 buffered = self._out_of_band.pop(task_id, None)
293 if buffered:
294 return self._set_cache_by_message(task_id, buffered)
295
296 # Polling and using basic_get
297 latest_by_id = {}
298 prev = None
299 for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit):
300 tid = self._get_message_task_id(acc)
301 prev, latest_by_id[tid] = latest_by_id.get(tid), acc
302 if prev:
303 # backends aren't expected to keep history,
304 # so we delete everything except the most recent state.
305 prev.ack()
306 prev = None
307
308 latest = latest_by_id.pop(task_id, None)
309 for tid, msg in latest_by_id.items():
310 self.on_out_of_band_result(tid, msg)
311
312 if latest:
313 latest.requeue()
314 return self._set_cache_by_message(task_id, latest)
315 else:
316 # no new state, use previous
317 try:
318 return self._cache[task_id]
319 except KeyError:
320 # result probably pending.
321 return {'status': states.PENDING, 'result': None}
322 poll = get_task_meta # XXX compat
323
324 def _set_cache_by_message(self, task_id, message):

Callers

nothing calls this directly

Calls 7

_set_cache_by_messageMethod · 0.95
_slurp_from_queueMethod · 0.95
_get_message_task_idMethod · 0.95
on_out_of_band_resultMethod · 0.95
popMethod · 0.45
getMethod · 0.45
itemsMethod · 0.45

Tested by

no test coverage detected