MCPcopy
hub / github.com/celery/celery / unlock_chord

Function unlock_chord

celery/app/builtins.py:49–97  ·  view source on GitHub ↗
(self, group_id, callback, interval=None,
                     max_retries=None, result=None,
                     Result=app.AsyncResult, GroupResult=app.GroupResult,
                     result_from_tuple=result_from_tuple, **kwargs)

Source from the content-addressed store, hash-verified

47 @app.task(name='celery.chord_unlock', max_retries=None, shared=False,
48 default_retry_delay=app.conf.result_chord_retry_interval, ignore_result=True, lazy=False, bind=True)
49 def unlock_chord(self, group_id, callback, interval=None,
50 max_retries=None, result=None,
51 Result=app.AsyncResult, GroupResult=app.GroupResult,
52 result_from_tuple=result_from_tuple, **kwargs):
53 if interval is None:
54 interval = self.default_retry_delay
55
56 # check if the task group is ready, and if so apply the callback.
57 callback = maybe_signature(callback, app)
58 deps = GroupResult(
59 group_id,
60 [result_from_tuple(r, app=app) for r in result],
61 app=app,
62 )
63 j = deps.join_native if deps.supports_native_join else deps.join
64
65 try:
66 ready = deps.ready()
67 except Exception as exc:
68 raise self.retry(
69 exc=exc, countdown=interval, max_retries=max_retries,
70 )
71 else:
72 if not ready:
73 raise self.retry(countdown=interval, max_retries=max_retries)
74
75 callback = maybe_signature(callback, app=app)
76 try:
77 with allow_join_result():
78 ret = j(
79 timeout=app.conf.result_chord_join_timeout,
80 propagate=True,
81 )
82 except Exception as exc: # pylint: disable=broad-except
83 try:
84 culprit = next(deps._failed_join_report())
85 reason = f'Dependency {culprit.id} raised {exc!r}'
86 except StopIteration:
87 reason = repr(exc)
88 logger.exception('Chord %r raised: %r', group_id, exc)
89 chord_error = _create_chord_error_with_cause(message=reason, original_exc=exc)
90 app.backend.chord_error_from_stack(callback=callback, exc=chord_error)
91 else:
92 try:
93 callback.delay(ret)
94 except Exception as exc: # pylint: disable=broad-except
95 logger.exception('Chord %r raised: %r', group_id, exc)
96 chord_error = _create_chord_error_with_cause(message=f'Callback error: {exc!r}', original_exc=exc)
97 app.backend.chord_error_from_stack(callback=callback, exc=chord_error)
98 return unlock_chord
99
100

Callers 1

test_deps_ready_failsMethod · 0.85

Calls 10

maybe_signatureFunction · 0.90
result_from_tupleFunction · 0.90
allow_join_resultFunction · 0.90
GroupResultClass · 0.85
retryMethod · 0.80
readyMethod · 0.45
_failed_join_reportMethod · 0.45
delayMethod · 0.45

Tested by 1

test_deps_ready_failsMethod · 0.68