Tests basic task call
(self, manager)
| 74 | |
| 75 | @flaky |
| 76 | def test_basic_task(self, manager): |
| 77 | """Tests basic task call""" |
| 78 | results = [] |
| 79 | # Tests calling task only with args |
| 80 | for i in range(10): |
| 81 | results.append([i + i, add.delay(i, i)]) |
| 82 | for expected, result in results: |
| 83 | value = result.get(timeout=10) |
| 84 | assert value == expected |
| 85 | assert result.status == 'SUCCESS' |
| 86 | assert result.ready() is True |
| 87 | assert result.successful() is True |
| 88 | |
| 89 | results = [] |
| 90 | # Tests calling task with args and kwargs |
| 91 | for i in range(10): |
| 92 | results.append([3*i, add.delay(i, i, z=i)]) |
| 93 | for expected, result in results: |
| 94 | value = result.get(timeout=10) |
| 95 | assert value == expected |
| 96 | assert result.status == 'SUCCESS' |
| 97 | assert result.ready() is True |
| 98 | assert result.successful() is True |
| 99 | |
| 100 | @flaky |
| 101 | @pytest.mark.skip(reason="Broken test") |
nothing calls this directly
no test coverage detected