MCPcopy
hub / github.com/celery/celery / detect_quorum_queues

Function detect_quorum_queues

celery/utils/quorum_queues.py:4–20  ·  view source on GitHub ↗

Detect if any of the queues are quorum queues. Returns: tuple[bool, str]: A tuple containing a boolean indicating if any of the queues are quorum queues and the name of the first quorum queue found or an empty string if no quorum queues were found.

(app, driver_type: str)

Source from the content-addressed store, hash-verified

2
3
4def detect_quorum_queues(app, driver_type: str) -> tuple[bool, str]:
5 """Detect if any of the queues are quorum queues.
6
7 Returns:
8 tuple[bool, str]: A tuple containing a boolean indicating if any of the queues are quorum queues
9 and the name of the first quorum queue found or an empty string if no quorum queues were found.
10 """
11 is_rabbitmq_broker = driver_type == 'amqp'
12
13 if is_rabbitmq_broker:
14 queues = app.amqp.queues
15 for qname in queues:
16 qarguments = queues[qname].queue_arguments or {}
17 if qarguments.get("x-queue-type") == "quorum":
18 return True, qname
19
20 return False, ""

Callers 6

qos_globalMethod · 0.90
include_ifMethod · 0.90
send_taskMethod · 0.85

Calls 1

getMethod · 0.45