(self, manager)
| 1949 | assert parent_id is None |
| 1950 | |
| 1951 | def test_chord_on_error(self, manager): |
| 1952 | from celery import states |
| 1953 | |
| 1954 | from .tasks import ExpectedException |
| 1955 | |
| 1956 | if not manager.app.conf.result_backend.startswith('redis'): |
| 1957 | raise pytest.skip('Requires redis result backend.') |
| 1958 | |
| 1959 | # Run the chord and wait for the error callback to finish. Note that |
| 1960 | # this only works for old style callbacks since they get dispatched to |
| 1961 | # run async while new style errbacks are called synchronously so that |
| 1962 | # they can be passed the request object for the failing task. |
| 1963 | c1 = chord( |
| 1964 | header=[add.s(1, 2), add.s(3, 4), fail.s()], |
| 1965 | body=print_unicode.s('This should not be called').on_error( |
| 1966 | errback_old_style.s()), |
| 1967 | ) |
| 1968 | res = c1() |
| 1969 | with pytest.raises(ExpectedException): |
| 1970 | res.get(propagate=True) |
| 1971 | |
| 1972 | # Got to wait for children to populate. |
| 1973 | check = ( |
| 1974 | lambda: res.children, |
| 1975 | lambda: res.children[0].children, |
| 1976 | lambda: res.children[0].children[0].result, |
| 1977 | ) |
| 1978 | start = monotonic() |
| 1979 | while not all(f() for f in check): |
| 1980 | if monotonic() > start + TIMEOUT: |
| 1981 | raise TimeoutError("Timed out waiting for children") |
| 1982 | sleep(0.1) |
| 1983 | |
| 1984 | # Extract the results of the successful tasks from the chord. |
| 1985 | # |
| 1986 | # We could do this inside the error handler, and probably would in a |
| 1987 | # real system, but for the purposes of the test it's obnoxious to get |
| 1988 | # data out of the error handler. |
| 1989 | # |
| 1990 | # So for clarity of our test, we instead do it here. |
| 1991 | |
| 1992 | # Use the error callback's result to find the failed task. |
| 1993 | uuid_patt = re.compile( |
| 1994 | r"[0-9A-Fa-f]{8}-([0-9A-Fa-f]{4}-){3}[0-9A-Fa-f]{12}" |
| 1995 | ) |
| 1996 | callback_chord_exc = AsyncResult( |
| 1997 | res.children[0].children[0].result |
| 1998 | ).result |
| 1999 | failed_task_id = uuid_patt.search(str(callback_chord_exc)) |
| 2000 | assert (failed_task_id is not None), "No task ID in %r" % callback_chord_exc |
| 2001 | failed_task_id = failed_task_id.group() |
| 2002 | |
| 2003 | # Use new group_id result metadata to get group ID. |
| 2004 | failed_task_result = AsyncResult(failed_task_id) |
| 2005 | original_group_id = failed_task_result._get_task_meta()['group_id'] |
| 2006 | |
| 2007 | # Use group ID to get preserved group result. |
| 2008 | backend = fail.app.backend |
nothing calls this directly
no test coverage detected