MCPcopy
hub / github.com/celery/celery / test_revoked

Method test_revoked

t/unit/worker/test_control.py:405–413  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

403 assert 'MyQueue' in consumer.task_consumer.canceled
404
405 def test_revoked(self):
406 worker_state.revoked.clear()
407 worker_state.revoked.add('a1')
408 worker_state.revoked.add('a2')
409
410 try:
411 assert sorted(self.panel.handle('dump_revoked')) == ['a1', 'a2']
412 finally:
413 worker_state.revoked.clear()
414
415 def test_dump_schedule(self):
416 consumer = Consumer(self.app)

Callers

nothing calls this directly

Calls 3

clearMethod · 0.45
addMethod · 0.45
handleMethod · 0.45

Tested by

no test coverage detected