MCPcopy
hub / github.com/celery/celery / test_revoked

Method test_revoked

t/integration/test_inspect.py:211–221  ·  view source on GitHub ↗

Testing revoking of task

(self, inspect)

Source from the content-addressed store, hash-verified

209
210 @flaky
211 def test_revoked(self, inspect):
212 """Testing revoking of task"""
213 # Fill the queue with tasks to fill the queue
214 for _ in range(4):
215 sleeping.delay(2)
216 # Execute task and revoke it
217 result = add.apply_async((1, 1))
218 result.revoke()
219 ret = inspect.revoked()
220 assert len(ret) == 1
221 assert result.task_id in ret[NODENAME]
222
223 @flaky
224 def test_conf(self, inspect):

Callers

nothing calls this directly

Calls 4

delayMethod · 0.45
apply_asyncMethod · 0.45
revokeMethod · 0.45
revokedMethod · 0.45

Tested by

no test coverage detected