| 47 | @app.task(name='celery.chord_unlock', max_retries=None, shared=False, |
| 48 | default_retry_delay=app.conf.result_chord_retry_interval, ignore_result=True, lazy=False, bind=True) |
| 49 | def unlock_chord(self, group_id, callback, interval=None, |
| 50 | max_retries=None, result=None, |
| 51 | Result=app.AsyncResult, GroupResult=app.GroupResult, |
| 52 | result_from_tuple=result_from_tuple, **kwargs): |
| 53 | if interval is None: |
| 54 | interval = self.default_retry_delay |
| 55 | |
| 56 | # check if the task group is ready, and if so apply the callback. |
| 57 | callback = maybe_signature(callback, app) |
| 58 | deps = GroupResult( |
| 59 | group_id, |
| 60 | [result_from_tuple(r, app=app) for r in result], |
| 61 | app=app, |
| 62 | ) |
| 63 | j = deps.join_native if deps.supports_native_join else deps.join |
| 64 | |
| 65 | try: |
| 66 | ready = deps.ready() |
| 67 | except Exception as exc: |
| 68 | raise self.retry( |
| 69 | exc=exc, countdown=interval, max_retries=max_retries, |
| 70 | ) |
| 71 | else: |
| 72 | if not ready: |
| 73 | raise self.retry(countdown=interval, max_retries=max_retries) |
| 74 | |
| 75 | callback = maybe_signature(callback, app=app) |
| 76 | try: |
| 77 | with allow_join_result(): |
| 78 | ret = j( |
| 79 | timeout=app.conf.result_chord_join_timeout, |
| 80 | propagate=True, |
| 81 | ) |
| 82 | except Exception as exc: # pylint: disable=broad-except |
| 83 | try: |
| 84 | culprit = next(deps._failed_join_report()) |
| 85 | reason = f'Dependency {culprit.id} raised {exc!r}' |
| 86 | except StopIteration: |
| 87 | reason = repr(exc) |
| 88 | logger.exception('Chord %r raised: %r', group_id, exc) |
| 89 | chord_error = _create_chord_error_with_cause(message=reason, original_exc=exc) |
| 90 | app.backend.chord_error_from_stack(callback=callback, exc=chord_error) |
| 91 | else: |
| 92 | try: |
| 93 | callback.delay(ret) |
| 94 | except Exception as exc: # pylint: disable=broad-except |
| 95 | logger.exception('Chord %r raised: %r', group_id, exc) |
| 96 | chord_error = _create_chord_error_with_cause(message=f'Callback error: {exc!r}', original_exc=exc) |
| 97 | app.backend.chord_error_from_stack(callback=callback, exc=chord_error) |
| 98 | return unlock_chord |
| 99 | |
| 100 | |