Freeze the tasks in the group. Note: If the group tasks are created from a generator, the tasks generator would not be exhausted, and the tasks would be frozen lazily. Returns: tuple: A tuple of the group id, and the AsyncResult of each of the gr
(self, _id=None, group_id=None, chord=None,
root_id=None, parent_id=None, group_index=None)
| 1816 | return options, group_id, options.get('root_id') |
| 1817 | |
| 1818 | def _freeze_group_tasks(self, _id=None, group_id=None, chord=None, |
| 1819 | root_id=None, parent_id=None, group_index=None): |
| 1820 | """Freeze the tasks in the group. |
| 1821 | |
| 1822 | Note: |
| 1823 | If the group tasks are created from a generator, the tasks generator would |
| 1824 | not be exhausted, and the tasks would be frozen lazily. |
| 1825 | |
| 1826 | Returns: |
| 1827 | tuple: A tuple of the group id, and the AsyncResult of each of the group tasks. |
| 1828 | """ |
| 1829 | # pylint: disable=redefined-outer-name |
| 1830 | # XXX chord is also a class in outer scope. |
| 1831 | opts = self.options |
| 1832 | try: |
| 1833 | gid = opts['task_id'] |
| 1834 | except KeyError: |
| 1835 | gid = opts['task_id'] = group_id or uuid() |
| 1836 | if group_id: |
| 1837 | opts['group_id'] = group_id |
| 1838 | if chord: |
| 1839 | opts['chord'] = chord |
| 1840 | if group_index is not None: |
| 1841 | opts['group_index'] = group_index |
| 1842 | root_id = opts.setdefault('root_id', root_id) |
| 1843 | parent_id = opts.setdefault('parent_id', parent_id) |
| 1844 | if isinstance(self.tasks, _regen): |
| 1845 | # When the group tasks are a generator, we need to make sure we don't |
| 1846 | # exhaust it during the freeze process. We use two generators to do this. |
| 1847 | # One generator will be used to freeze the tasks to get their AsyncResult. |
| 1848 | # The second generator will be used to replace the tasks in the group with an unexhausted state. |
| 1849 | |
| 1850 | # Create two new generators from the original generator of the group tasks (cloning the tasks). |
| 1851 | tasks1, tasks2 = itertools.tee(self._unroll_tasks(self.tasks)) |
| 1852 | # Use the first generator to freeze the group tasks to acquire the AsyncResult for each task. |
| 1853 | results = regen(self._freeze_tasks(tasks1, group_id, chord, root_id, parent_id)) |
| 1854 | # Use the second generator to replace the exhausted generator of the group tasks. |
| 1855 | self.tasks = regen(tasks2) |
| 1856 | else: |
| 1857 | new_tasks = [] |
| 1858 | # Need to unroll subgroups early so that chord gets the |
| 1859 | # right result instance for chord_unlock etc. |
| 1860 | results = list(self._freeze_unroll( |
| 1861 | new_tasks, group_id, chord, root_id, parent_id, |
| 1862 | )) |
| 1863 | if isinstance(self.tasks, MutableSequence): |
| 1864 | self.tasks[:] = new_tasks |
| 1865 | else: |
| 1866 | self.tasks = new_tasks |
| 1867 | return gid, results |
| 1868 | |
| 1869 | def freeze(self, _id=None, group_id=None, chord=None, |
| 1870 | root_id=None, parent_id=None, group_index=None): |
no test coverage detected