MCPcopy
hub / github.com/celery/celery / test_active_queues

Method test_active_queues

t/unit/worker/test_control.py:266–277  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

264 assert 'error' in r
265
266 def test_active_queues(self):
267 import kombu
268
269 x = kombu.Consumer(self.app.connection_for_read(),
270 [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),
271 kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],
272 auto_declare=False)
273 consumer = Mock()
274 consumer.task_consumer = x
275 panel = self.create_panel(consumer=consumer)
276 r = panel.handle('active_queues')
277 assert list(sorted(q['name'] for q in r)) == ['bar', 'foo']
278
279 def test_active_queues__empty(self):
280 consumer = Mock(name='consumer')

Callers

nothing calls this directly

Calls 4

create_panelMethod · 0.95
ConsumerMethod · 0.80
connection_for_readMethod · 0.45
handleMethod · 0.45

Tested by

no test coverage detected