MCPcopy
hub / github.com/celery/celery / create_app

Function create_app

t/integration/test_quorum_queue_qos_cluster_simulation.py:21–44  ·  view source on GitHub ↗
(queue_name: str)

Source from the content-addressed store, hash-verified

19
20
21def create_app(queue_name: str) -> Celery:
22 rabbitmq_user = os.environ.get("RABBITMQ_DEFAULT_USER", "guest")
23 rabbitmq_pass = os.environ.get("RABBITMQ_DEFAULT_PASS", "guest")
24 redis_host = os.environ.get("REDIS_HOST", "localhost")
25 redis_port = os.environ.get("REDIS_PORT", "6379")
26
27 broker_url = os.environ.get("TEST_BROKER", f"pyamqp://{rabbitmq_user}:{rabbitmq_pass}@localhost:5672//")
28 backend_url = os.environ.get("TEST_BACKEND", f"redis://{redis_host}:{redis_port}/0")
29
30 app = Celery("quorum_qos_race", broker=broker_url, backend=backend_url)
31
32 app.conf.task_queues = [
33 Queue(
34 name=queue_name,
35 queue_arguments={"x-queue-type": "quorum"},
36 )
37 ]
38 app.conf.task_default_queue = queue_name
39 app.conf.worker_prefetch_multiplier = 1
40 app.conf.task_acks_late = True
41 app.conf.task_reject_on_worker_lost = True
42 app.conf.broker_transport_options = {"confirm_publish": True}
43
44 return app
45
46
47def dummy_task_factory(app: Celery, simulate_qos_issue: bool):

Callers 1

run_workerFunction · 0.85

Calls 3

CeleryClass · 0.90
QueueClass · 0.85
getMethod · 0.45

Tested by

no test coverage detected