(self, args=None, kwargs=None, add_to_parent=True,
producer=None, link=None, link_error=None, **options)
| 1593 | return self |
| 1594 | |
| 1595 | def apply_async(self, args=None, kwargs=None, add_to_parent=True, |
| 1596 | producer=None, link=None, link_error=None, **options): |
| 1597 | args = args if args else () |
| 1598 | if link is not None: |
| 1599 | raise TypeError('Cannot add link to group: use a chord') |
| 1600 | if link_error is not None: |
| 1601 | raise TypeError( |
| 1602 | 'Cannot add link to group: do that on individual tasks') |
| 1603 | app = self.app |
| 1604 | if app.conf.task_always_eager: |
| 1605 | return self.apply(args, kwargs, **options) |
| 1606 | if not self.tasks: |
| 1607 | return self.freeze() |
| 1608 | |
| 1609 | options, group_id, root_id = self._freeze_gid(options) |
| 1610 | tasks = self._prepared(self.tasks, [], group_id, root_id, app) |
| 1611 | p = barrier() |
| 1612 | results = list(self._apply_tasks(tasks, producer, app, p, |
| 1613 | args=args, kwargs=kwargs, **options)) |
| 1614 | result = self.app.GroupResult(group_id, results, ready_barrier=p) |
| 1615 | p.finalize() |
| 1616 | |
| 1617 | # - Special case of group(A.s() | group(B.s(), C.s())) |
| 1618 | # That is, group with single item that's a chain but the |
| 1619 | # last task in that chain is a group. |
| 1620 | # |
| 1621 | # We cannot actually support arbitrary GroupResults in chains, |
| 1622 | # but this special case we can. |
| 1623 | if len(result) == 1 and isinstance(result[0], GroupResult): |
| 1624 | result = result[0] |
| 1625 | |
| 1626 | parent_task = app.current_worker_task |
| 1627 | if add_to_parent and parent_task: |
| 1628 | parent_task.add_trail(result) |
| 1629 | return result |
| 1630 | |
| 1631 | def apply(self, args=None, kwargs=None, **options): |
| 1632 | args = args if args else () |
no test coverage detected