MCPcopy
hub / github.com/celery/celery / join_native

Method join_native

celery/result.py:809–843  ·  view source on GitHub ↗

Backend optimized version of :meth:`join`. .. versionadded:: 2.2 Note that this does not support collecting the results for different task types using different backends. This is currently only supported by the amqp, Redis and cache result backends.

(self, timeout=None, propagate=True,
                    interval=0.5, callback=None, no_ack=True,
                    on_message=None, on_interval=None,
                    disable_sync_subtasks=True)

Source from the content-addressed store, hash-verified

807 )
808
809 def join_native(self, timeout=None, propagate=True,
810 interval=0.5, callback=None, no_ack=True,
811 on_message=None, on_interval=None,
812 disable_sync_subtasks=True):
813 """Backend optimized version of :meth:`join`.
814
815 .. versionadded:: 2.2
816
817 Note that this does not support collecting the results
818 for different task types using different backends.
819
820 This is currently only supported by the amqp, Redis and cache
821 result backends.
822 """
823 if disable_sync_subtasks:
824 assert_will_not_block()
825 order_index = None if callback else {
826 result.id: i for i, result in enumerate(self.results)
827 }
828 acc = None if callback else [None for _ in range(len(self))]
829 for task_id, meta in self.iter_native(timeout, interval, no_ack,
830 on_message, on_interval):
831 if isinstance(meta, list):
832 value = []
833 for children_result in meta:
834 value.append(children_result.get())
835 else:
836 value = meta['result']
837 if propagate and meta['status'] in states.PROPAGATE_STATES:
838 raise value
839 if callback:
840 callback(task_id, value)
841 else:
842 acc[order_index[task_id]] = value
843 return acc
844
845 def _iter_meta(self, **kwargs):
846 return (meta for _, meta in self.backend.get_many(

Calls 4

iter_nativeMethod · 0.95
assert_will_not_blockFunction · 0.85
callbackFunction · 0.85
getMethod · 0.45

Tested by 3

test_join_nativeMethod · 0.64