Test that a lone group in a chain completes.
(self, manager)
| 534 | |
| 535 | @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout") |
| 536 | def test_nested_chain_group_lone(self, manager): # Fails with Redis 5.x |
| 537 | """ |
| 538 | Test that a lone group in a chain completes. |
| 539 | """ |
| 540 | sig = chain( |
| 541 | group(identity.s(42), identity.s(42)), # [42, 42] |
| 542 | ) |
| 543 | res = sig.delay() |
| 544 | assert res.get(timeout=TIMEOUT / 10) == [42, 42] |
| 545 | |
| 546 | def test_nested_chain_group_mid(self, manager): |
| 547 | """ |