MCPcopy
hub / github.com/celery/celery / test_nested_chain_group_last

Method test_nested_chain_group_last

t/integration/test_canvas.py:563–572  ·  view source on GitHub ↗

Test that a final group in a chain with preceding tasks completes.

(self, manager)

Source from the content-addressed store, hash-verified

561 assert res.get(timeout=TIMEOUT) == [42, 42]
562
563 def test_nested_chain_group_last(self, manager):
564 """
565 Test that a final group in a chain with preceding tasks completes.
566 """
567 sig = chain(
568 identity.s(42), # 42
569 group(identity.s(), identity.s()), # [42, 42]
570 )
571 res = sig.delay()
572 assert res.get(timeout=TIMEOUT) == [42, 42]
573
574 def test_chain_replaced_with_a_chain_and_a_callback(self, manager):
575 if not manager.app.conf.result_backend.startswith('redis'):

Callers

nothing calls this directly

Calls 5

chainClass · 0.90
groupClass · 0.90
sMethod · 0.45
delayMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected