Testing revoking of task
(self, manager)
| 234 | |
| 235 | @flaky |
| 236 | def test_revoked(self, manager): |
| 237 | """Testing revoking of task""" |
| 238 | # Fill the queue with tasks to fill the queue |
| 239 | for _ in range(4): |
| 240 | sleeping.delay(2) |
| 241 | # Execute task and revoke it |
| 242 | result = add.apply_async((1, 1)) |
| 243 | result.revoke() |
| 244 | with pytest.raises(celery.exceptions.TaskRevokedError): |
| 245 | result.get() |
| 246 | assert result.status == 'REVOKED' |
| 247 | assert result.ready() is True |
| 248 | assert result.failed() is False |
| 249 | assert result.successful() is False |
| 250 | |
| 251 | def test_revoked_by_headers_simple_canvas(self, manager): |
| 252 | """Testing revoking of task using a stamped header""" |
nothing calls this directly
no test coverage detected