MCPcopy
hub / github.com/celery/celery / get_many

Method get_many

celery/backends/base.py:1037–1073  ·  view source on GitHub ↗
(self, task_ids, timeout=None, interval=0.5, no_ack=True,
                 on_message=None, on_interval=None, max_iterations=None,
                 READY_STATES=states.READY_STATES)

Source from the content-addressed store, hash-verified

1035 }
1036
1037 def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True,
1038 on_message=None, on_interval=None, max_iterations=None,
1039 READY_STATES=states.READY_STATES):
1040 interval = 0.5 if interval is None else interval
1041 ids = task_ids if isinstance(task_ids, set) else set(task_ids)
1042 cached_ids = set()
1043 cache = self._cache
1044 for task_id in ids:
1045 try:
1046 cached = cache[task_id]
1047 except KeyError:
1048 pass
1049 else:
1050 if cached['status'] in READY_STATES:
1051 yield bytes_to_str(task_id), cached
1052 cached_ids.add(task_id)
1053
1054 ids.difference_update(cached_ids)
1055 iterations = 0
1056 while ids:
1057 keys = list(ids)
1058 r = self._mget_to_results(self.mget([self.get_key_for_task(k)
1059 for k in keys]), keys, READY_STATES)
1060 cache.update(r)
1061 ids.difference_update({bytes_to_str(v) for v in r})
1062 for key, value in r.items():
1063 if on_message is not None:
1064 on_message(value)
1065 yield bytes_to_str(key), value
1066 if timeout and iterations * interval >= timeout:
1067 raise TimeoutError(f'Operation timed out ({timeout})')
1068 if on_interval:
1069 on_interval()
1070 time.sleep(interval) # don't busy loop.
1071 iterations += 1
1072 if max_iterations and iterations >= max_iterations:
1073 break
1074
1075 def _forget(self, task_id):
1076 self.delete(self.get_key_for_task(task_id))

Callers 2

_iter_metaMethod · 0.45
iter_nativeMethod · 0.45

Calls 8

_mget_to_resultsMethod · 0.95
mgetMethod · 0.95
get_key_for_taskMethod · 0.95
TimeoutErrorClass · 0.90
addMethod · 0.45
updateMethod · 0.45
itemsMethod · 0.45
sleepMethod · 0.45

Tested by

no test coverage detected