MCPcopy
hub / github.com/celery/celery / test_chord_on_error

Method test_chord_on_error

t/integration/test_canvas.py:1951–2025  ·  view source on GitHub ↗
(self, manager)

Source from the content-addressed store, hash-verified

1949 assert parent_id is None
1950
1951 def test_chord_on_error(self, manager):
1952 from celery import states
1953
1954 from .tasks import ExpectedException
1955
1956 if not manager.app.conf.result_backend.startswith('redis'):
1957 raise pytest.skip('Requires redis result backend.')
1958
1959 # Run the chord and wait for the error callback to finish. Note that
1960 # this only works for old style callbacks since they get dispatched to
1961 # run async while new style errbacks are called synchronously so that
1962 # they can be passed the request object for the failing task.
1963 c1 = chord(
1964 header=[add.s(1, 2), add.s(3, 4), fail.s()],
1965 body=print_unicode.s('This should not be called').on_error(
1966 errback_old_style.s()),
1967 )
1968 res = c1()
1969 with pytest.raises(ExpectedException):
1970 res.get(propagate=True)
1971
1972 # Got to wait for children to populate.
1973 check = (
1974 lambda: res.children,
1975 lambda: res.children[0].children,
1976 lambda: res.children[0].children[0].result,
1977 )
1978 start = monotonic()
1979 while not all(f() for f in check):
1980 if monotonic() > start + TIMEOUT:
1981 raise TimeoutError("Timed out waiting for children")
1982 sleep(0.1)
1983
1984 # Extract the results of the successful tasks from the chord.
1985 #
1986 # We could do this inside the error handler, and probably would in a
1987 # real system, but for the purposes of the test it's obnoxious to get
1988 # data out of the error handler.
1989 #
1990 # So for clarity of our test, we instead do it here.
1991
1992 # Use the error callback's result to find the failed task.
1993 uuid_patt = re.compile(
1994 r"[0-9A-Fa-f]{8}-([0-9A-Fa-f]{4}-){3}[0-9A-Fa-f]{12}"
1995 )
1996 callback_chord_exc = AsyncResult(
1997 res.children[0].children[0].result
1998 ).result
1999 failed_task_id = uuid_patt.search(str(callback_chord_exc))
2000 assert (failed_task_id is not None), "No task ID in %r" % callback_chord_exc
2001 failed_task_id = failed_task_id.group()
2002
2003 # Use new group_id result metadata to get group ID.
2004 failed_task_result = AsyncResult(failed_task_id)
2005 original_group_id = failed_task_result._get_task_meta()['group_id']
2006
2007 # Use group ID to get preserved group result.
2008 backend = fail.app.backend

Callers

nothing calls this directly

Calls 14

_get_task_metaMethod · 0.95
chordFunction · 0.90
TimeoutErrorClass · 0.90
AsyncResultClass · 0.90
get_redis_connectionFunction · 0.85
get_key_for_groupMethod · 0.80
zrangeMethod · 0.80
lrangeMethod · 0.80
sMethod · 0.45
on_errorMethod · 0.45
raisesMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected