Backend optimized version of :meth:`join`. .. versionadded:: 2.2 Note that this does not support collecting the results for different task types using different backends. This is currently only supported by the amqp, Redis and cache result backends.
(self, timeout=None, propagate=True,
interval=0.5, callback=None, no_ack=True,
on_message=None, on_interval=None,
disable_sync_subtasks=True)
| 807 | ) |
| 808 | |
| 809 | def join_native(self, timeout=None, propagate=True, |
| 810 | interval=0.5, callback=None, no_ack=True, |
| 811 | on_message=None, on_interval=None, |
| 812 | disable_sync_subtasks=True): |
| 813 | """Backend optimized version of :meth:`join`. |
| 814 | |
| 815 | .. versionadded:: 2.2 |
| 816 | |
| 817 | Note that this does not support collecting the results |
| 818 | for different task types using different backends. |
| 819 | |
| 820 | This is currently only supported by the amqp, Redis and cache |
| 821 | result backends. |
| 822 | """ |
| 823 | if disable_sync_subtasks: |
| 824 | assert_will_not_block() |
| 825 | order_index = None if callback else { |
| 826 | result.id: i for i, result in enumerate(self.results) |
| 827 | } |
| 828 | acc = None if callback else [None for _ in range(len(self))] |
| 829 | for task_id, meta in self.iter_native(timeout, interval, no_ack, |
| 830 | on_message, on_interval): |
| 831 | if isinstance(meta, list): |
| 832 | value = [] |
| 833 | for children_result in meta: |
| 834 | value.append(children_result.get()) |
| 835 | else: |
| 836 | value = meta['result'] |
| 837 | if propagate and meta['status'] in states.PROPAGATE_STATES: |
| 838 | raise value |
| 839 | if callback: |
| 840 | callback(task_id, value) |
| 841 | else: |
| 842 | acc[order_index[task_id]] = value |
| 843 | return acc |
| 844 | |
| 845 | def _iter_meta(self, **kwargs): |
| 846 | return (meta for _, meta in self.backend.get_many( |