Recursively unroll the group into a generator of its tasks. This is used by :meth:`apply_async` and :meth:`apply` to unroll the group into a list of tasks that can be evaluated. Note: This does not change the group itself, it only returns a generator
(self, tasks, partial_args, group_id, root_id, app,
CallableSignature=abstract.CallableSignature,
from_dict=Signature.from_dict,
isinstance=isinstance, tuple=tuple)
| 1691 | return tuple(child_task.link_error(sig.clone(immutable=True)) for child_task in self.tasks) |
| 1692 | |
| 1693 | def _prepared(self, tasks, partial_args, group_id, root_id, app, |
| 1694 | CallableSignature=abstract.CallableSignature, |
| 1695 | from_dict=Signature.from_dict, |
| 1696 | isinstance=isinstance, tuple=tuple): |
| 1697 | """Recursively unroll the group into a generator of its tasks. |
| 1698 | |
| 1699 | This is used by :meth:`apply_async` and :meth:`apply` to |
| 1700 | unroll the group into a list of tasks that can be evaluated. |
| 1701 | |
| 1702 | Note: |
| 1703 | This does not change the group itself, it only returns |
| 1704 | a generator of the tasks that the group would evaluate to. |
| 1705 | |
| 1706 | Arguments: |
| 1707 | tasks (list): List of tasks in the group (may contain nested groups). |
| 1708 | partial_args (list): List of arguments to be prepended to |
| 1709 | the arguments of each task. |
| 1710 | group_id (str): The group id of the group. |
| 1711 | root_id (str): The root id of the group. |
| 1712 | app (Celery): The Celery app instance. |
| 1713 | CallableSignature (class): The signature class of the group's tasks. |
| 1714 | from_dict (fun): Function to create a signature from a dict. |
| 1715 | isinstance (fun): Function to check if an object is an instance |
| 1716 | of a class. |
| 1717 | tuple (class): A tuple-like class. |
| 1718 | |
| 1719 | Returns: |
| 1720 | generator: A generator for the unrolled group tasks. |
| 1721 | The generator yields tuples of the form ``(task, AsyncResult, group_id)``. |
| 1722 | """ |
| 1723 | for index, task in enumerate(tasks): |
| 1724 | if isinstance(task, CallableSignature): |
| 1725 | # local sigs are always of type Signature, and we |
| 1726 | # clone them to make sure we don't modify the originals. |
| 1727 | task = task.clone() |
| 1728 | else: |
| 1729 | # serialized sigs must be converted to Signature. |
| 1730 | task = from_dict(task, app=app) |
| 1731 | if isinstance(task, group): |
| 1732 | # needs yield_from :( |
| 1733 | unroll = task._prepared( |
| 1734 | task.tasks, partial_args, group_id, root_id, app, |
| 1735 | ) |
| 1736 | yield from unroll |
| 1737 | else: |
| 1738 | if partial_args and not task.immutable: |
| 1739 | task.args = tuple(partial_args) + tuple(task.args) |
| 1740 | yield task, task.freeze(group_id=group_id, root_id=root_id, group_index=index), group_id |
| 1741 | |
| 1742 | def _apply_tasks(self, tasks, producer=None, app=None, p=None, |
| 1743 | add_to_parent=None, chord=None, |