(self, task_ids, timeout=None, interval=0.5, no_ack=True,
on_message=None, on_interval=None, max_iterations=None,
READY_STATES=states.READY_STATES)
| 1035 | } |
| 1036 | |
| 1037 | def get_many(self, task_ids, timeout=None, interval=0.5, no_ack=True, |
| 1038 | on_message=None, on_interval=None, max_iterations=None, |
| 1039 | READY_STATES=states.READY_STATES): |
| 1040 | interval = 0.5 if interval is None else interval |
| 1041 | ids = task_ids if isinstance(task_ids, set) else set(task_ids) |
| 1042 | cached_ids = set() |
| 1043 | cache = self._cache |
| 1044 | for task_id in ids: |
| 1045 | try: |
| 1046 | cached = cache[task_id] |
| 1047 | except KeyError: |
| 1048 | pass |
| 1049 | else: |
| 1050 | if cached['status'] in READY_STATES: |
| 1051 | yield bytes_to_str(task_id), cached |
| 1052 | cached_ids.add(task_id) |
| 1053 | |
| 1054 | ids.difference_update(cached_ids) |
| 1055 | iterations = 0 |
| 1056 | while ids: |
| 1057 | keys = list(ids) |
| 1058 | r = self._mget_to_results(self.mget([self.get_key_for_task(k) |
| 1059 | for k in keys]), keys, READY_STATES) |
| 1060 | cache.update(r) |
| 1061 | ids.difference_update({bytes_to_str(v) for v in r}) |
| 1062 | for key, value in r.items(): |
| 1063 | if on_message is not None: |
| 1064 | on_message(value) |
| 1065 | yield bytes_to_str(key), value |
| 1066 | if timeout and iterations * interval >= timeout: |
| 1067 | raise TimeoutError(f'Operation timed out ({timeout})') |
| 1068 | if on_interval: |
| 1069 | on_interval() |
| 1070 | time.sleep(interval) # don't busy loop. |
| 1071 | iterations += 1 |
| 1072 | if max_iterations and iterations >= max_iterations: |
| 1073 | break |
| 1074 | |
| 1075 | def _forget(self, task_id): |
| 1076 | self.delete(self.get_key_for_task(task_id)) |
no test coverage detected