MCPcopy
hub / github.com/celery/celery / iter_native

Method iter_native

celery/backends/asynchronous.py:231–258  ·  view source on GitHub ↗
(self, result, no_ack=True, **kwargs)

Source from the content-addressed store, hash-verified

229 self.result_consumer.buckets[result] = bucket
230
231 def iter_native(self, result, no_ack=True, **kwargs):
232 self._ensure_not_eager()
233
234 results = result.results
235 if not results:
236 raise StopIteration()
237
238 # we tell the result consumer to put consumed results
239 # into these buckets.
240 bucket = deque()
241 for node in results:
242 if not hasattr(node, '_cache'):
243 bucket.append(node)
244 elif node._cache:
245 bucket.append(node)
246 else:
247 self._collect_into(node, bucket)
248
249 for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs):
250 while bucket:
251 node = bucket.popleft()
252 if not hasattr(node, '_cache'):
253 yield node.id, node.children
254 else:
255 yield node.id, node._cache
256 while bucket:
257 node = bucket.popleft()
258 yield node.id, node._cache
259
260 def add_pending_result(self, result, weak=False, start_drainer=True):
261 if start_drainer:

Callers

nothing calls this directly

Calls 3

_collect_intoMethod · 0.95
_wait_for_pendingMethod · 0.95
_ensure_not_eagerMethod · 0.45

Tested by

no test coverage detected