MCPcopy
hub / github.com/celery/celery / _apply_tasks

Method _apply_tasks

celery/canvas.py:1742–1804  ·  view source on GitHub ↗

Run all the tasks in the group. This is used by :meth:`apply_async` to run all the tasks in the group and return a generator of their results. Arguments: tasks (list): List of tasks in the group. producer (Producer): The producer to use to publish th

(self, tasks, producer=None, app=None, p=None,
                     add_to_parent=None, chord=None,
                     args=None, kwargs=None, group_index=None, **options)

Source from the content-addressed store, hash-verified

1740 yield task, task.freeze(group_id=group_id, root_id=root_id, group_index=index), group_id
1741
1742 def _apply_tasks(self, tasks, producer=None, app=None, p=None,
1743 add_to_parent=None, chord=None,
1744 args=None, kwargs=None, group_index=None, **options):
1745 """Run all the tasks in the group.
1746
1747 This is used by :meth:`apply_async` to run all the tasks in the group
1748 and return a generator of their results.
1749
1750 Arguments:
1751 tasks (list): List of tasks in the group.
1752 producer (Producer): The producer to use to publish the tasks.
1753 app (Celery): The Celery app instance.
1754 p (barrier): Barrier object to synchronize the tasks results.
1755 args (list): List of arguments to be prepended to
1756 the arguments of each task.
1757 kwargs (dict): Dict of keyword arguments to be merged with
1758 the keyword arguments of each task.
1759 **options (dict): Options to be merged with the options of each task.
1760
1761 Returns:
1762 generator: A generator for the AsyncResult of the tasks in the group.
1763 """
1764 # pylint: disable=redefined-outer-name
1765 # XXX chord is also a class in outer scope.
1766 app = app or self.app
1767 with app.producer_or_acquire(producer) as producer:
1768 # Iterate through tasks two at a time. If tasks is a generator,
1769 # we are able to tell when we are at the end by checking if
1770 # next_task is None. This enables us to set the chord size
1771 # without burning through the entire generator. See #3021.
1772 chord_size = 0
1773 tasks_shifted, tasks = itertools.tee(tasks)
1774 next(tasks_shifted, None)
1775 next_task = next(tasks_shifted, None)
1776
1777 for task_index, current_task in enumerate(tasks):
1778 # We expect that each task must be part of the same group which
1779 # seems sensible enough. If that's somehow not the case we'll
1780 # end up messing up chord counts and there are all sorts of
1781 # awful race conditions to think about. We'll hope it's not!
1782 sig, res, group_id = current_task
1783 chord_obj = chord if chord is not None else sig.options.get("chord")
1784 # We need to check the chord size of each contributing task so
1785 # that when we get to the final one, we can correctly set the
1786 # size in the backend and the chord can be sensible completed.
1787 chord_size += _chord._descend(sig)
1788 if chord_obj is not None and next_task is None:
1789 # Per above, sanity check that we only saw one group
1790 app.backend.set_chord_size(group_id, chord_size)
1791 sig.apply_async(producer=producer, add_to_parent=False,
1792 chord=chord_obj, args=args, kwargs=kwargs,
1793 **options)
1794 # adding callback to result, such that it will gradually
1795 # fulfill the barrier.
1796 #
1797 # Using barrier.add would use result.then, but we need
1798 # to add the weak argument here to only create a weak
1799 # reference to the object.

Callers 1

apply_asyncMethod · 0.95

Calls 6

producer_or_acquireMethod · 0.80
_descendMethod · 0.80
getMethod · 0.45
set_chord_sizeMethod · 0.45
apply_asyncMethod · 0.45
thenMethod · 0.45

Tested by

no test coverage detected