MCPcopy
hub / github.com/celery/celery / test_complex_chain

Method test_complex_chain

t/integration/test_canvas.py:185–193  ·  t/integration/test_canvas.py::test_chain.test_complex_chain
(self, manager)

Source from the content-addressed store, hash-verified

183
184 @flaky
185 def test_complex_chain(self, manager):
186 g = group(add.s(i) for i in range(4))
187 c = (
188 add.s(2, 2) | (
189 add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
190 ) | g
191 )
192 res = c()
193 assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
194
195 @pytest.mark.xfail(raises=TimeoutError, reason=class="st">"Task is timeout")
196 def test_group_results_in_chain(self, manager):

Callers

nothing calls this directly

Calls 3

groupClass · 0.90
sMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected