MCPcopy
hub / github.com/celery/celery / test_detect_quorum_queues_negative

Method test_detect_quorum_queues_negative

t/unit/worker/test_consumer.py:890–896  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

888 assert name == "celery"
889
890 def test_detect_quorum_queues_negative(self):
891 c = self.c
892 self.c.connection.transport.driver_type = 'amqp'
893 c.app.amqp.queues = {"celery": Mock(queue_arguments=None)}
894 result, name = detect_quorum_queues(c.app, c.connection.transport.driver_type)
895 assert not result
896 assert name == ""
897
898 def test_detect_quorum_queues_not_rabbitmq(self):
899 c = self.c

Callers

nothing calls this directly

Calls 1

detect_quorum_queuesFunction · 0.90

Tested by

no test coverage detected