MCPcopy
hub / github.com/celery/celery / test_chain_of_explicit_chords

Method test_chain_of_explicit_chords

t/integration/test_canvas.py:430–448  ·  view source on GitHub ↗
(self, manager)

Source from the content-addressed store, hash-verified

428
429 @flaky
430 def test_chain_of_explicit_chords(self, manager):
431 try:
432 manager.app.backend.ensure_chords_allowed()
433 except NotImplementedError as e:
434 raise pytest.skip(e.args[0])
435
436 c1 = chain(
437 chord(group(add.si(1, 0), add.si(1, 0)), tsum.s()),
438 chord(group(add.s(1), add.s(1)), tsum.s()),
439 chord(group(add.s(0), add.s(0)), tsum.s()),
440 )
441 c2 = chain(
442 chord(group(add.s(10), add.s(10)), tsum.s()),
443 chord(group(add.s(0), add.s(0)), tsum.s()),
444 chord(group(add.s(1), add.s(1)), tsum.s()),
445 )
446 c = c1 | c2
447 res = c()
448 assert res.get(timeout=TIMEOUT) == 178
449
450 @flaky
451 def test_chain_of_nine_chords(self, manager):

Callers

nothing calls this directly

Calls 7

chainClass · 0.90
chordFunction · 0.90
groupClass · 0.90
siMethod · 0.80
ensure_chords_allowedMethod · 0.45
sMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected