| 2130 | self.body.stamp(visitor, append_stamps, **headers) |
| 2131 | |
| 2132 | def apply_async(self, args=None, kwargs=None, task_id=None, |
| 2133 | producer=None, publisher=None, connection=None, |
| 2134 | router=None, result_cls=None, **options): |
| 2135 | args = args if args else () |
| 2136 | kwargs = kwargs if kwargs else {} |
| 2137 | args = (tuple(args) + tuple(self.args) |
| 2138 | if args and not self.immutable else self.args) |
| 2139 | body = kwargs.pop('body', None) or self.kwargs['body'] |
| 2140 | kwargs = dict(self.kwargs['kwargs'], **kwargs) |
| 2141 | body = body.clone(**options) |
| 2142 | app = self._get_app(body) |
| 2143 | tasks = (self.tasks.clone() if isinstance(self.tasks, group) |
| 2144 | else group(self.tasks, app=app, task_id=self.options.get('task_id', uuid()))) |
| 2145 | if app.conf.task_always_eager: |
| 2146 | with allow_join_result(): |
| 2147 | return self.apply(args, kwargs, |
| 2148 | body=body, task_id=task_id, **options) |
| 2149 | |
| 2150 | merged_options = dict(self.options, **options) if options else self.options |
| 2151 | option_task_id = merged_options.pop("task_id", None) |
| 2152 | if task_id is None: |
| 2153 | task_id = option_task_id |
| 2154 | |
| 2155 | # chord([A, B, ...], C) |
| 2156 | return self.run(tasks, body, args, task_id=task_id, kwargs=kwargs, **merged_options) |
| 2157 | |
| 2158 | def apply(self, args=None, kwargs=None, |
| 2159 | propagate=True, body=None, **options): |