MCPcopy
hub / github.com/celery/celery / test_group_kwargs

Method test_group_kwargs

t/integration/test_canvas.py:1769–1780  ·  view source on GitHub ↗
(self, manager)

Source from the content-addressed store, hash-verified

1767 assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
1768
1769 def test_group_kwargs(self, manager):
1770 try:
1771 manager.app.backend.ensure_chords_allowed()
1772 except NotImplementedError as e:
1773 raise pytest.skip(e.args[0])
1774 c = (
1775 add.s(2, 2) |
1776 group(add.s(i) for i in range(4)) |
1777 add_to_all.s(8)
1778 )
1779 res = c.apply_async(kwargs={"z": 1})
1780 assert res.get(timeout=TIMEOUT) == [13, 14, 15, 16]
1781
1782 def test_group_args_and_kwargs(self, manager):
1783 try:

Callers

nothing calls this directly

Calls 5

groupClass · 0.90
ensure_chords_allowedMethod · 0.45
sMethod · 0.45
apply_asyncMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected