MCPcopy
hub / github.com/celery/celery / is_using_quorum_queues

Function is_using_quorum_queues

examples/quorum-queues/myapp.py:51–58  ·  view source on GitHub ↗
(app)

Source from the content-addressed store, hash-verified

49
50
51def is_using_quorum_queues(app) -> bool:
52 queues = app.amqp.queues
53 for qname in queues:
54 qarguments = queues[qname].queue_arguments or {}
55 if qarguments.get("x-queue-type") == "quorum":
56 return True
57
58 return False
59
60
61@app.task

Callers 1

exampleFunction · 0.85

Calls 1

getMethod · 0.45

Tested by

no test coverage detected