MCPcopy
hub / github.com/celery/celery / test_nested_chain_group_mid

Method test_nested_chain_group_mid

t/integration/test_canvas.py:546–561  ·  view source on GitHub ↗

Test that a mid-point group in a chain completes.

(self, manager)

Source from the content-addressed store, hash-verified

544 assert res.get(timeout=TIMEOUT / 10) == [42, 42]
545
546 def test_nested_chain_group_mid(self, manager):
547 """
548 Test that a mid-point group in a chain completes.
549 """
550 try:
551 manager.app.backend.ensure_chords_allowed()
552 except NotImplementedError as e:
553 raise pytest.skip(e.args[0])
554
555 sig = chain(
556 identity.s(42), # 42
557 group(identity.s(), identity.s()), # [42, 42]
558 identity.s(), # [42, 42]
559 )
560 res = sig.delay()
561 assert res.get(timeout=TIMEOUT) == [42, 42]
562
563 def test_nested_chain_group_last(self, manager):
564 """

Callers

nothing calls this directly

Calls 6

chainClass · 0.90
groupClass · 0.90
ensure_chords_allowedMethod · 0.45
sMethod · 0.45
delayMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected