Handle chord errors when the callback is a group. When a chord header fails and the body is a group, we need to: 1. Revoke all pending tasks in the group body 2. Mark them as failed with the chord error 3. Call error callbacks for each task This prevents the
(self, group_callback, backend, exc=None)
| 335 | return backend.fail_from_current_stack(callback.id, exc=exc) |
| 336 | |
| 337 | def _handle_group_chord_error(self, group_callback, backend, exc=None): |
| 338 | """Handle chord errors when the callback is a group. |
| 339 | |
| 340 | When a chord header fails and the body is a group, we need to: |
| 341 | 1. Revoke all pending tasks in the group body |
| 342 | 2. Mark them as failed with the chord error |
| 343 | 3. Call error callbacks for each task |
| 344 | |
| 345 | This prevents the group body tasks from hanging indefinitely (#8786) |
| 346 | """ |
| 347 | |
| 348 | # Extract original exception from ChordError if available |
| 349 | if isinstance(exc, ChordError) and hasattr(exc, '__cause__') and exc.__cause__: |
| 350 | original_exc = exc.__cause__ |
| 351 | else: |
| 352 | original_exc = exc |
| 353 | |
| 354 | try: |
| 355 | # Freeze the group to get the actual GroupResult with task IDs |
| 356 | frozen_group = group_callback.freeze() |
| 357 | |
| 358 | if isinstance(frozen_group, GroupResult): |
| 359 | # revoke all tasks in the group to prevent execution |
| 360 | frozen_group.revoke() |
| 361 | |
| 362 | # Handle each task in the group individually |
| 363 | for result in frozen_group.results: |
| 364 | try: |
| 365 | # Create fake request for error callbacks |
| 366 | fake_request = _create_fake_task_request( |
| 367 | task_id=result.id, |
| 368 | errbacks=group_callback.options.get("link_error", []), |
| 369 | task_name=getattr(result, 'task', 'unknown') |
| 370 | ) |
| 371 | |
| 372 | # Call error callbacks for this task with original exception |
| 373 | try: |
| 374 | backend._call_task_errbacks(fake_request, original_exc, None) |
| 375 | except Exception: # pylint: disable=broad-except |
| 376 | # continue on exception to be sure to iter to all the group tasks |
| 377 | pass |
| 378 | |
| 379 | # Mark the individual task as failed with original exception |
| 380 | backend.fail_from_current_stack(result.id, exc=original_exc) |
| 381 | |
| 382 | except Exception as task_exc: # pylint: disable=broad-except |
| 383 | # Log error but continue with other tasks |
| 384 | logger.exception( |
| 385 | 'Failed to handle chord error for task %s: %r', |
| 386 | getattr(result, 'id', 'unknown'), task_exc |
| 387 | ) |
| 388 | |
| 389 | # Also mark the group itself as failed if it has an ID |
| 390 | frozen_group_id = getattr(frozen_group, 'id', None) |
| 391 | if frozen_group_id: |
| 392 | backend.mark_as_failure(frozen_group_id, original_exc) |
| 393 | |
| 394 | return None |