Dispatch callbacks and chain for a completed task. Dispatches link callbacks and then the next chain step. Does NOT fire task lifecycle signals (on_success, task_postrun) or call mark_as_done — callers handle those separately. Note: dispatch is not atomic. If callb
(
retval, callbacks, chain, parent_id, root_id, priority,
)
| 424 | return I, R, I.state, I.retval |
| 425 | |
| 426 | def _dispatch_callbacks_and_chain( |
| 427 | retval, callbacks, chain, parent_id, root_id, priority, |
| 428 | ): |
| 429 | """Dispatch callbacks and chain for a completed task. |
| 430 | |
| 431 | Dispatches link callbacks and then the next chain step. |
| 432 | Does NOT fire task lifecycle signals (on_success, task_postrun) |
| 433 | or call mark_as_done — callers handle those separately. |
| 434 | |
| 435 | Note: dispatch is not atomic. If callbacks succeed but the |
| 436 | chain step fails (or vice-versa), a Reject + redeliver may |
| 437 | re-dispatch the already-sent callbacks. This is acceptable |
| 438 | under Celery's at-least-once delivery model. |
| 439 | """ |
| 440 | if callbacks: |
| 441 | if len(callbacks) > 1: |
| 442 | sigs, groups = [], [] |
| 443 | for sig in callbacks: |
| 444 | sig = signature(sig, app=app) |
| 445 | if isinstance(sig, group): |
| 446 | groups.append(sig) |
| 447 | else: |
| 448 | sigs.append(sig) |
| 449 | for group_ in groups: |
| 450 | group_.apply_async( |
| 451 | (retval,), |
| 452 | parent_id=parent_id, root_id=root_id, |
| 453 | priority=priority, |
| 454 | ) |
| 455 | if sigs: |
| 456 | group(sigs, app=app).apply_async( |
| 457 | (retval,), |
| 458 | parent_id=parent_id, root_id=root_id, |
| 459 | priority=priority, |
| 460 | ) |
| 461 | else: |
| 462 | signature(callbacks[0], app=app).apply_async( |
| 463 | (retval,), |
| 464 | parent_id=parent_id, root_id=root_id, |
| 465 | priority=priority, |
| 466 | ) |
| 467 | if chain: |
| 468 | _chsig = signature(chain[-1], app=app) |
| 469 | _chsig.apply_async( |
| 470 | (retval,), chain=chain[:-1], |
| 471 | parent_id=parent_id, root_id=root_id, |
| 472 | priority=priority, |
| 473 | ) |
| 474 | |
| 475 | def trace_task(uuid, args, kwargs, request=None): |
| 476 | # R - is the possibly prepared return value. |
no test coverage detected