MCPcopy
hub / github.com/celery/celery / _freeze_group_tasks

Method _freeze_group_tasks

celery/canvas.py:1818–1867  ·  view source on GitHub ↗

Freeze the tasks in the group. Note: If the group tasks are created from a generator, the tasks generator would not be exhausted, and the tasks would be frozen lazily. Returns: tuple: A tuple of the group id, and the AsyncResult of each of the gr

(self, _id=None, group_id=None, chord=None,
                            root_id=None, parent_id=None, group_index=None)

Source from the content-addressed store, hash-verified

1816 return options, group_id, options.get('root_id')
1817
1818 def _freeze_group_tasks(self, _id=None, group_id=None, chord=None,
1819 root_id=None, parent_id=None, group_index=None):
1820 """Freeze the tasks in the group.
1821
1822 Note:
1823 If the group tasks are created from a generator, the tasks generator would
1824 not be exhausted, and the tasks would be frozen lazily.
1825
1826 Returns:
1827 tuple: A tuple of the group id, and the AsyncResult of each of the group tasks.
1828 """
1829 # pylint: disable=redefined-outer-name
1830 # XXX chord is also a class in outer scope.
1831 opts = self.options
1832 try:
1833 gid = opts['task_id']
1834 except KeyError:
1835 gid = opts['task_id'] = group_id or uuid()
1836 if group_id:
1837 opts['group_id'] = group_id
1838 if chord:
1839 opts['chord'] = chord
1840 if group_index is not None:
1841 opts['group_index'] = group_index
1842 root_id = opts.setdefault('root_id', root_id)
1843 parent_id = opts.setdefault('parent_id', parent_id)
1844 if isinstance(self.tasks, _regen):
1845 # When the group tasks are a generator, we need to make sure we don't
1846 # exhaust it during the freeze process. We use two generators to do this.
1847 # One generator will be used to freeze the tasks to get their AsyncResult.
1848 # The second generator will be used to replace the tasks in the group with an unexhausted state.
1849
1850 # Create two new generators from the original generator of the group tasks (cloning the tasks).
1851 tasks1, tasks2 = itertools.tee(self._unroll_tasks(self.tasks))
1852 # Use the first generator to freeze the group tasks to acquire the AsyncResult for each task.
1853 results = regen(self._freeze_tasks(tasks1, group_id, chord, root_id, parent_id))
1854 # Use the second generator to replace the exhausted generator of the group tasks.
1855 self.tasks = regen(tasks2)
1856 else:
1857 new_tasks = []
1858 # Need to unroll subgroups early so that chord gets the
1859 # right result instance for chord_unlock etc.
1860 results = list(self._freeze_unroll(
1861 new_tasks, group_id, chord, root_id, parent_id,
1862 ))
1863 if isinstance(self.tasks, MutableSequence):
1864 self.tasks[:] = new_tasks
1865 else:
1866 self.tasks = new_tasks
1867 return gid, results
1868
1869 def freeze(self, _id=None, group_id=None, chord=None,
1870 root_id=None, parent_id=None, group_index=None):

Callers 2

freezeMethod · 0.95
runMethod · 0.80

Calls 5

_unroll_tasksMethod · 0.95
_freeze_tasksMethod · 0.95
_freeze_unrollMethod · 0.95
regenFunction · 0.90
setdefaultMethod · 0.45

Tested by

no test coverage detected