(self, result, no_ack=True, **kwargs)
| 229 | self.result_consumer.buckets[result] = bucket |
| 230 | |
| 231 | def iter_native(self, result, no_ack=True, **kwargs): |
| 232 | self._ensure_not_eager() |
| 233 | |
| 234 | results = result.results |
| 235 | if not results: |
| 236 | raise StopIteration() |
| 237 | |
| 238 | # we tell the result consumer to put consumed results |
| 239 | # into these buckets. |
| 240 | bucket = deque() |
| 241 | for node in results: |
| 242 | if not hasattr(node, '_cache'): |
| 243 | bucket.append(node) |
| 244 | elif node._cache: |
| 245 | bucket.append(node) |
| 246 | else: |
| 247 | self._collect_into(node, bucket) |
| 248 | |
| 249 | for _ in self._wait_for_pending(result, no_ack=no_ack, **kwargs): |
| 250 | while bucket: |
| 251 | node = bucket.popleft() |
| 252 | if not hasattr(node, '_cache'): |
| 253 | yield node.id, node.children |
| 254 | else: |
| 255 | yield node.id, node._cache |
| 256 | while bucket: |
| 257 | node = bucket.popleft() |
| 258 | yield node.id, node._cache |
| 259 | |
| 260 | def add_pending_result(self, result, weak=False, start_drainer=True): |
| 261 | if start_drainer: |
nothing calls this directly
no test coverage detected