MCPcopy
hub / github.com/celery/celery / _dispatch_callbacks_and_chain

Function _dispatch_callbacks_and_chain

celery/app/trace.py:426–473  ·  view source on GitHub ↗

Dispatch callbacks and chain for a completed task. Dispatches link callbacks and then the next chain step. Does NOT fire task lifecycle signals (on_success, task_postrun) or call mark_as_done — callers handle those separately. Note: dispatch is not atomic. If callb

(
        retval, callbacks, chain, parent_id, root_id, priority,
    )

Source from the content-addressed store, hash-verified

424 return I, R, I.state, I.retval
425
426 def _dispatch_callbacks_and_chain(
427 retval, callbacks, chain, parent_id, root_id, priority,
428 ):
429 """Dispatch callbacks and chain for a completed task.
430
431 Dispatches link callbacks and then the next chain step.
432 Does NOT fire task lifecycle signals (on_success, task_postrun)
433 or call mark_as_done — callers handle those separately.
434
435 Note: dispatch is not atomic. If callbacks succeed but the
436 chain step fails (or vice-versa), a Reject + redeliver may
437 re-dispatch the already-sent callbacks. This is acceptable
438 under Celery's at-least-once delivery model.
439 """
440 if callbacks:
441 if len(callbacks) > 1:
442 sigs, groups = [], []
443 for sig in callbacks:
444 sig = signature(sig, app=app)
445 if isinstance(sig, group):
446 groups.append(sig)
447 else:
448 sigs.append(sig)
449 for group_ in groups:
450 group_.apply_async(
451 (retval,),
452 parent_id=parent_id, root_id=root_id,
453 priority=priority,
454 )
455 if sigs:
456 group(sigs, app=app).apply_async(
457 (retval,),
458 parent_id=parent_id, root_id=root_id,
459 priority=priority,
460 )
461 else:
462 signature(callbacks[0], app=app).apply_async(
463 (retval,),
464 parent_id=parent_id, root_id=root_id,
465 priority=priority,
466 )
467 if chain:
468 _chsig = signature(chain[-1], app=app)
469 _chsig.apply_async(
470 (retval,), chain=chain[:-1],
471 parent_id=parent_id, root_id=root_id,
472 priority=priority,
473 )
474
475 def trace_task(uuid, args, kwargs, request=None):
476 # R - is the possibly prepared return value.

Callers 1

trace_taskFunction · 0.85

Calls 3

groupFunction · 0.90
signatureFunction · 0.85
apply_asyncMethod · 0.45

Tested by

no test coverage detected