(self)
| 264 | assert 'error' in r |
| 265 | |
| 266 | def test_active_queues(self): |
| 267 | import kombu |
| 268 | |
| 269 | x = kombu.Consumer(self.app.connection_for_read(), |
| 270 | [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'), |
| 271 | kombu.Queue('bar', kombu.Exchange('bar'), 'bar')], |
| 272 | auto_declare=False) |
| 273 | consumer = Mock() |
| 274 | consumer.task_consumer = x |
| 275 | panel = self.create_panel(consumer=consumer) |
| 276 | r = panel.handle('active_queues') |
| 277 | assert list(sorted(q['name'] for q in r)) == ['bar', 'foo'] |
| 278 | |
| 279 | def test_active_queues__empty(self): |
| 280 | consumer = Mock(name='consumer') |
nothing calls this directly
no test coverage detected