MCPcopy
hub / github.com/celery/celery / test_revoked

Method test_revoked

t/integration/test_tasks.py:236–249  ·  view source on GitHub ↗

Testing revoking of task

(self, manager)

Source from the content-addressed store, hash-verified

234
235 @flaky
236 def test_revoked(self, manager):
237 """Testing revoking of task"""
238 # Fill the queue with tasks to fill the queue
239 for _ in range(4):
240 sleeping.delay(2)
241 # Execute task and revoke it
242 result = add.apply_async((1, 1))
243 result.revoke()
244 with pytest.raises(celery.exceptions.TaskRevokedError):
245 result.get()
246 assert result.status == 'REVOKED'
247 assert result.ready() is True
248 assert result.failed() is False
249 assert result.successful() is False
250
251 def test_revoked_by_headers_simple_canvas(self, manager):
252 """Testing revoking of task using a stamped header"""

Callers

nothing calls this directly

Calls 8

delayMethod · 0.45
apply_asyncMethod · 0.45
revokeMethod · 0.45
raisesMethod · 0.45
getMethod · 0.45
readyMethod · 0.45
failedMethod · 0.45
successfulMethod · 0.45

Tested by

no test coverage detected