MCPcopy
hub / github.com/celery/celery / test_chain_on_error

Method test_chain_on_error

t/integration/test_canvas.py:213–229  ·  view source on GitHub ↗
(self, manager)

Source from the content-addressed store, hash-verified

211 chain([chain(sig)]).apply_async()
212
213 def test_chain_on_error(self, manager):
214 from .tasks import ExpectedException
215
216 if not manager.app.conf.result_backend.startswith('redis'):
217 raise pytest.skip('Requires redis result backend.')
218
219 # Run the chord and wait for the error callback to finish.
220 c1 = chain(
221 add.s(1, 2), fail.s(), add.s(3, 4),
222 )
223 res = c1()
224
225 with pytest.raises(ExpectedException):
226 res.get(propagate=True)
227
228 with pytest.raises(ExpectedException):
229 res.parent.get(propagate=True)
230
231 @flaky
232 def test_chain_inside_group_receives_arguments(self, manager):

Callers

nothing calls this directly

Calls 4

chainClass · 0.90
sMethod · 0.45
raisesMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected