MCPcopy
hub / github.com/celery/celery / apply_async

Method apply_async

celery/canvas.py:2132–2156  ·  view source on GitHub ↗
(self, args=None, kwargs=None, task_id=None,
                    producer=None, publisher=None, connection=None,
                    router=None, result_cls=None, **options)

Source from the content-addressed store, hash-verified

2130 self.body.stamp(visitor, append_stamps, **headers)
2131
2132 def apply_async(self, args=None, kwargs=None, task_id=None,
2133 producer=None, publisher=None, connection=None,
2134 router=None, result_cls=None, **options):
2135 args = args if args else ()
2136 kwargs = kwargs if kwargs else {}
2137 args = (tuple(args) + tuple(self.args)
2138 if args and not self.immutable else self.args)
2139 body = kwargs.pop('body', None) or self.kwargs['body']
2140 kwargs = dict(self.kwargs['kwargs'], **kwargs)
2141 body = body.clone(**options)
2142 app = self._get_app(body)
2143 tasks = (self.tasks.clone() if isinstance(self.tasks, group)
2144 else group(self.tasks, app=app, task_id=self.options.get('task_id', uuid())))
2145 if app.conf.task_always_eager:
2146 with allow_join_result():
2147 return self.apply(args, kwargs,
2148 body=body, task_id=task_id, **options)
2149
2150 merged_options = dict(self.options, **options) if options else self.options
2151 option_task_id = merged_options.pop("task_id", None)
2152 if task_id is None:
2153 task_id = option_task_id
2154
2155 # chord([A, B, ...], C)
2156 return self.run(tasks, body, args, task_id=task_id, kwargs=kwargs, **merged_options)
2157
2158 def apply(self, args=None, kwargs=None,
2159 propagate=True, body=None, **options):

Callers 1

__call__Method · 0.95

Calls 8

_get_appMethod · 0.95
applyMethod · 0.95
runMethod · 0.95
allow_join_resultFunction · 0.90
groupClass · 0.70
popMethod · 0.45
cloneMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected