MCPcopy
hub / github.com/celery/celery / test_add__cancel_consumer

Method test_add__cancel_consumer

t/unit/worker/test_control.py:374–403  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

372 assert 'error' in r
373
374 def test_add__cancel_consumer(self):
375
376 class MockConsumer:
377 queues = []
378 canceled = []
379 consuming = False
380 hub = Mock(name='hub')
381
382 def add_queue(self, queue):
383 self.queues.append(queue.name)
384
385 def consume(self):
386 self.consuming = True
387
388 def cancel_by_queue(self, queue):
389 self.canceled.append(queue)
390
391 def consuming_from(self, queue):
392 return queue in self.queues
393
394 consumer = Consumer(self.app)
395 consumer.task_consumer = MockConsumer()
396 panel = self.create_panel(consumer=consumer)
397
398 panel.handle('add_consumer', {'queue': 'MyQueue'})
399 assert 'MyQueue' in consumer.task_consumer.queues
400 assert consumer.task_consumer.consuming
401 panel.handle('add_consumer', {'queue': 'MyQueue'})
402 panel.handle('cancel_consumer', {'queue': 'MyQueue'})
403 assert 'MyQueue' in consumer.task_consumer.canceled
404
405 def test_revoked(self):
406 worker_state.revoked.clear()

Callers

nothing calls this directly

Calls 4

create_panelMethod · 0.95
MockConsumerClass · 0.85
ConsumerClass · 0.70
handleMethod · 0.45

Tested by

no test coverage detected