Run all the tasks in the group. This is used by :meth:`apply_async` to run all the tasks in the group and return a generator of their results. Arguments: tasks (list): List of tasks in the group. producer (Producer): The producer to use to publish th
(self, tasks, producer=None, app=None, p=None,
add_to_parent=None, chord=None,
args=None, kwargs=None, group_index=None, **options)
| 1740 | yield task, task.freeze(group_id=group_id, root_id=root_id, group_index=index), group_id |
| 1741 | |
| 1742 | def _apply_tasks(self, tasks, producer=None, app=None, p=None, |
| 1743 | add_to_parent=None, chord=None, |
| 1744 | args=None, kwargs=None, group_index=None, **options): |
| 1745 | """Run all the tasks in the group. |
| 1746 | |
| 1747 | This is used by :meth:`apply_async` to run all the tasks in the group |
| 1748 | and return a generator of their results. |
| 1749 | |
| 1750 | Arguments: |
| 1751 | tasks (list): List of tasks in the group. |
| 1752 | producer (Producer): The producer to use to publish the tasks. |
| 1753 | app (Celery): The Celery app instance. |
| 1754 | p (barrier): Barrier object to synchronize the tasks results. |
| 1755 | args (list): List of arguments to be prepended to |
| 1756 | the arguments of each task. |
| 1757 | kwargs (dict): Dict of keyword arguments to be merged with |
| 1758 | the keyword arguments of each task. |
| 1759 | **options (dict): Options to be merged with the options of each task. |
| 1760 | |
| 1761 | Returns: |
| 1762 | generator: A generator for the AsyncResult of the tasks in the group. |
| 1763 | """ |
| 1764 | # pylint: disable=redefined-outer-name |
| 1765 | # XXX chord is also a class in outer scope. |
| 1766 | app = app or self.app |
| 1767 | with app.producer_or_acquire(producer) as producer: |
| 1768 | # Iterate through tasks two at a time. If tasks is a generator, |
| 1769 | # we are able to tell when we are at the end by checking if |
| 1770 | # next_task is None. This enables us to set the chord size |
| 1771 | # without burning through the entire generator. See #3021. |
| 1772 | chord_size = 0 |
| 1773 | tasks_shifted, tasks = itertools.tee(tasks) |
| 1774 | next(tasks_shifted, None) |
| 1775 | next_task = next(tasks_shifted, None) |
| 1776 | |
| 1777 | for task_index, current_task in enumerate(tasks): |
| 1778 | # We expect that each task must be part of the same group which |
| 1779 | # seems sensible enough. If that's somehow not the case we'll |
| 1780 | # end up messing up chord counts and there are all sorts of |
| 1781 | # awful race conditions to think about. We'll hope it's not! |
| 1782 | sig, res, group_id = current_task |
| 1783 | chord_obj = chord if chord is not None else sig.options.get("chord") |
| 1784 | # We need to check the chord size of each contributing task so |
| 1785 | # that when we get to the final one, we can correctly set the |
| 1786 | # size in the backend and the chord can be sensible completed. |
| 1787 | chord_size += _chord._descend(sig) |
| 1788 | if chord_obj is not None and next_task is None: |
| 1789 | # Per above, sanity check that we only saw one group |
| 1790 | app.backend.set_chord_size(group_id, chord_size) |
| 1791 | sig.apply_async(producer=producer, add_to_parent=False, |
| 1792 | chord=chord_obj, args=args, kwargs=kwargs, |
| 1793 | **options) |
| 1794 | # adding callback to result, such that it will gradually |
| 1795 | # fulfill the barrier. |
| 1796 | # |
| 1797 | # Using barrier.add would use result.then, but we need |
| 1798 | # to add the weak argument here to only create a weak |
| 1799 | # reference to the object. |
no test coverage detected