Testing revoking of task
(self, inspect)
| 209 | |
| 210 | @flaky |
| 211 | def test_revoked(self, inspect): |
| 212 | """Testing revoking of task""" |
| 213 | # Fill the queue with tasks to fill the queue |
| 214 | for _ in range(4): |
| 215 | sleeping.delay(2) |
| 216 | # Execute task and revoke it |
| 217 | result = add.apply_async((1, 1)) |
| 218 | result.revoke() |
| 219 | ret = inspect.revoked() |
| 220 | assert len(ret) == 1 |
| 221 | assert result.task_id in ret[NODENAME] |
| 222 | |
| 223 | @flaky |
| 224 | def test_conf(self, inspect): |
nothing calls this directly
no test coverage detected