(self, task_id, backlog_limit=1000)
| 289 | self._out_of_band[task_id] = message |
| 290 | |
| 291 | def get_task_meta(self, task_id, backlog_limit=1000): |
| 292 | buffered = self._out_of_band.pop(task_id, None) |
| 293 | if buffered: |
| 294 | return self._set_cache_by_message(task_id, buffered) |
| 295 | |
| 296 | # Polling and using basic_get |
| 297 | latest_by_id = {} |
| 298 | prev = None |
| 299 | for acc in self._slurp_from_queue(task_id, self.accept, backlog_limit): |
| 300 | tid = self._get_message_task_id(acc) |
| 301 | prev, latest_by_id[tid] = latest_by_id.get(tid), acc |
| 302 | if prev: |
| 303 | # backends aren't expected to keep history, |
| 304 | # so we delete everything except the most recent state. |
| 305 | prev.ack() |
| 306 | prev = None |
| 307 | |
| 308 | latest = latest_by_id.pop(task_id, None) |
| 309 | for tid, msg in latest_by_id.items(): |
| 310 | self.on_out_of_band_result(tid, msg) |
| 311 | |
| 312 | if latest: |
| 313 | latest.requeue() |
| 314 | return self._set_cache_by_message(task_id, latest) |
| 315 | else: |
| 316 | # no new state, use previous |
| 317 | try: |
| 318 | return self._cache[task_id] |
| 319 | except KeyError: |
| 320 | # result probably pending. |
| 321 | return {'status': states.PENDING, 'result': None} |
| 322 | poll = get_task_meta # XXX compat |
| 323 | |
| 324 | def _set_cache_by_message(self, task_id, message): |
nothing calls this directly
no test coverage detected