(self, manager)
| 1664 | |
| 1665 | @pytest.mark.xfail(reason="async_results aren't performed in async way") |
| 1666 | def test_redis_subscribed_channels_leak(self, manager): |
| 1667 | if not manager.app.conf.result_backend.startswith('redis'): |
| 1668 | raise pytest.skip('Requires redis result backend.') |
| 1669 | |
| 1670 | manager.app.backend.result_consumer.on_after_fork() |
| 1671 | initial_channels = get_active_redis_channels() |
| 1672 | initial_channels_count = len(initial_channels) |
| 1673 | total_chords = 10 |
| 1674 | async_results = [ |
| 1675 | chord([add.s(5, 6), add.s(6, 7)])(delayed_sum.s()) |
| 1676 | for _ in range(total_chords) |
| 1677 | ] |
| 1678 | |
| 1679 | channels_before = get_active_redis_channels() |
| 1680 | manager.assert_result_tasks_in_progress_or_completed(async_results) |
| 1681 | |
| 1682 | channels_before_count = len(channels_before) |
| 1683 | assert set(channels_before) != set(initial_channels) |
| 1684 | assert channels_before_count > initial_channels_count |
| 1685 | |
| 1686 | # The total number of active Redis channels at this point |
| 1687 | # is the number of chord header tasks multiplied by the |
| 1688 | # total chord tasks, plus the initial channels |
| 1689 | # (existing from previous tests). |
| 1690 | chord_header_task_count = 2 |
| 1691 | assert channels_before_count <= \ |
| 1692 | chord_header_task_count * total_chords + initial_channels_count |
| 1693 | |
| 1694 | result_values = [ |
| 1695 | result.get(timeout=TIMEOUT) |
| 1696 | for result in async_results |
| 1697 | ] |
| 1698 | assert result_values == [24] * total_chords |
| 1699 | |
| 1700 | channels_after = get_active_redis_channels() |
| 1701 | channels_after_count = len(channels_after) |
| 1702 | |
| 1703 | assert channels_after_count == initial_channels_count |
| 1704 | assert set(channels_after) == set(initial_channels) |
| 1705 | |
| 1706 | @flaky |
| 1707 | def test_replaced_nested_chord(self, manager): |
nothing calls this directly
no test coverage detected