MCPcopy
hub / github.com/celery/celery / run

Method run

celery/canvas.py:2205–2265  ·  view source on GitHub ↗

Execute the chord. Executing the chord means executing the header and sending the result to the body. In case of an empty header, the body is executed immediately. Arguments: header (group): The header to execute. body (Signature): The body t

(self, header, body, partial_args, app=None, interval=None,
            countdown=1, max_retries=None, eager=False,
            task_id=None, kwargs=None, **options)

Source from the content-addressed store, hash-verified

2203 return sum(self._descend(task) for task in tasks)
2204
2205 def run(self, header, body, partial_args, app=None, interval=None,
2206 countdown=1, max_retries=None, eager=False,
2207 task_id=None, kwargs=None, **options):
2208 """Execute the chord.
2209
2210 Executing the chord means executing the header and sending the
2211 result to the body. In case of an empty header, the body is
2212 executed immediately.
2213
2214 Arguments:
2215 header (group): The header to execute.
2216 body (Signature): The body to execute.
2217 partial_args (tuple): Arguments to pass to the header.
2218 app (Celery): The Celery app instance.
2219 interval (float): The interval between retries.
2220 countdown (int): The countdown between retries.
2221 max_retries (int): The maximum number of retries.
2222 task_id (str): The task id to use for the body.
2223 kwargs (dict): Keyword arguments to pass to the header.
2224 options (dict): Options to pass to the header.
2225
2226 Returns:
2227 AsyncResult: The result of the body (with the result of the header in the parent of the body).
2228 """
2229 app = app or self._get_app(body)
2230 group_id = header.options.get('task_id') or uuid()
2231 root_id = body.options.get('root_id')
2232 options = dict(self.options, **options) if options else self.options
2233 if options:
2234 options.pop('task_id', None)
2235 body.options.update(options)
2236
2237 body_task_id = task_id or uuid()
2238 bodyres = body.freeze(body_task_id, group_id=group_id, root_id=root_id)
2239
2240 # Chains should not be passed to the header tasks. See #3771
2241 options.pop('chain', None)
2242 # Neither should chords, for deeply nested chords to work
2243 options.pop('chord', None)
2244 options.pop('task_id', None)
2245
2246 header_result_args = header._freeze_group_tasks(group_id=group_id, chord=body, root_id=root_id)
2247
2248 if header.tasks:
2249 app.backend.apply_chord(
2250 header_result_args,
2251 body,
2252 interval=interval,
2253 countdown=countdown,
2254 max_retries=max_retries,
2255 )
2256 header_result = header.apply_async(partial_args, kwargs, task_id=group_id, **options)
2257 # The execution of a chord body is normally triggered by its header's
2258 # tasks completing. If the header is empty this will never happen, so
2259 # we execute the body manually here.
2260 else:
2261 body.delay([])
2262 header_result = self.app.GroupResult(*header_result_args)

Callers 1

apply_asyncMethod · 0.95

Calls 10

_get_appMethod · 0.95
_freeze_group_tasksMethod · 0.80
GroupResultMethod · 0.80
getMethod · 0.45
popMethod · 0.45
updateMethod · 0.45
freezeMethod · 0.45
apply_chordMethod · 0.45
apply_asyncMethod · 0.45
delayMethod · 0.45

Tested by

no test coverage detected