MCPcopy
hub / github.com/celery/celery / _handle_group_chord_error

Method _handle_group_chord_error

celery/backends/base.py:337–403  ·  view source on GitHub ↗

Handle chord errors when the callback is a group. When a chord header fails and the body is a group, we need to: 1. Revoke all pending tasks in the group body 2. Mark them as failed with the chord error 3. Call error callbacks for each task This prevents the

(self, group_callback, backend, exc=None)

Source from the content-addressed store, hash-verified

335 return backend.fail_from_current_stack(callback.id, exc=exc)
336
337 def _handle_group_chord_error(self, group_callback, backend, exc=None):
338 """Handle chord errors when the callback is a group.
339
340 When a chord header fails and the body is a group, we need to:
341 1. Revoke all pending tasks in the group body
342 2. Mark them as failed with the chord error
343 3. Call error callbacks for each task
344
345 This prevents the group body tasks from hanging indefinitely (#8786)
346 """
347
348 # Extract original exception from ChordError if available
349 if isinstance(exc, ChordError) and hasattr(exc, '__cause__') and exc.__cause__:
350 original_exc = exc.__cause__
351 else:
352 original_exc = exc
353
354 try:
355 # Freeze the group to get the actual GroupResult with task IDs
356 frozen_group = group_callback.freeze()
357
358 if isinstance(frozen_group, GroupResult):
359 # revoke all tasks in the group to prevent execution
360 frozen_group.revoke()
361
362 # Handle each task in the group individually
363 for result in frozen_group.results:
364 try:
365 # Create fake request for error callbacks
366 fake_request = _create_fake_task_request(
367 task_id=result.id,
368 errbacks=group_callback.options.get("link_error", []),
369 task_name=getattr(result, 'task', 'unknown')
370 )
371
372 # Call error callbacks for this task with original exception
373 try:
374 backend._call_task_errbacks(fake_request, original_exc, None)
375 except Exception: # pylint: disable=broad-except
376 # continue on exception to be sure to iter to all the group tasks
377 pass
378
379 # Mark the individual task as failed with original exception
380 backend.fail_from_current_stack(result.id, exc=original_exc)
381
382 except Exception as task_exc: # pylint: disable=broad-except
383 # Log error but continue with other tasks
384 logger.exception(
385 'Failed to handle chord error for task %s: %r',
386 getattr(result, 'id', 'unknown'), task_exc
387 )
388
389 # Also mark the group itself as failed if it has an ID
390 frozen_group_id = getattr(frozen_group, 'id', None)
391 if frozen_group_id:
392 backend.mark_as_failure(frozen_group_id, original_exc)
393
394 return None

Calls 7

_call_task_errbacksMethod · 0.80
mark_as_failureMethod · 0.80
freezeMethod · 0.45
revokeMethod · 0.45
getMethod · 0.45