(self)
| 888 | assert name == "celery" |
| 889 | |
| 890 | def test_detect_quorum_queues_negative(self): |
| 891 | c = self.c |
| 892 | self.c.connection.transport.driver_type = 'amqp' |
| 893 | c.app.amqp.queues = {"celery": Mock(queue_arguments=None)} |
| 894 | result, name = detect_quorum_queues(c.app, c.connection.transport.driver_type) |
| 895 | assert not result |
| 896 | assert name == "" |
| 897 | |
| 898 | def test_detect_quorum_queues_not_rabbitmq(self): |
| 899 | c = self.c |
nothing calls this directly
no test coverage detected