(app)
| 49 | |
| 50 | |
| 51 | def is_using_quorum_queues(app) -> bool: |
| 52 | queues = app.amqp.queues |
| 53 | for qname in queues: |
| 54 | qarguments = queues[qname].queue_arguments or {} |
| 55 | if qarguments.get("x-queue-type") == "quorum": |
| 56 | return True |
| 57 | |
| 58 | return False |
| 59 | |
| 60 | |
| 61 | @app.task |