(queue_name: str)
| 19 | |
| 20 | |
| 21 | def create_app(queue_name: str) -> Celery: |
| 22 | rabbitmq_user = os.environ.get("RABBITMQ_DEFAULT_USER", "guest") |
| 23 | rabbitmq_pass = os.environ.get("RABBITMQ_DEFAULT_PASS", "guest") |
| 24 | redis_host = os.environ.get("REDIS_HOST", "localhost") |
| 25 | redis_port = os.environ.get("REDIS_PORT", "6379") |
| 26 | |
| 27 | broker_url = os.environ.get("TEST_BROKER", f"pyamqp://{rabbitmq_user}:{rabbitmq_pass}@localhost:5672//") |
| 28 | backend_url = os.environ.get("TEST_BACKEND", f"redis://{redis_host}:{redis_port}/0") |
| 29 | |
| 30 | app = Celery("quorum_qos_race", broker=broker_url, backend=backend_url) |
| 31 | |
| 32 | app.conf.task_queues = [ |
| 33 | Queue( |
| 34 | name=queue_name, |
| 35 | queue_arguments={"x-queue-type": "quorum"}, |
| 36 | ) |
| 37 | ] |
| 38 | app.conf.task_default_queue = queue_name |
| 39 | app.conf.worker_prefetch_multiplier = 1 |
| 40 | app.conf.task_acks_late = True |
| 41 | app.conf.task_reject_on_worker_lost = True |
| 42 | app.conf.broker_transport_options = {"confirm_publish": True} |
| 43 | |
| 44 | return app |
| 45 | |
| 46 | |
| 47 | def dummy_task_factory(app: Celery, simulate_qos_issue: bool): |
no test coverage detected