MCPcopy
hub / github.com/celery/celery / example

Function example

examples/quorum-queues/myapp.py:71–145  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

69
70
71def example():
72 queue = my_quorum_queue.name if my_quorum_queue in (app.conf.task_queues or {}) else "celery"
73
74 while True:
75 print("Celery Quorum Queue Example")
76 print("===========================")
77 print("1. Send a simple identity task")
78 print("1.1 Send an ETA identity task")
79 print("2. Send a group of add tasks")
80 print("3. Inspect the active queues")
81 print("4. Shutdown Celery worker")
82 print("Q. Quit")
83 print("Q! Exit")
84 choice = input("Enter your choice (1-4 or Q): ")
85
86 if choice == "1" or choice == "1.1":
87 queue_type = "Quorum" if is_using_quorum_queues(app) else "Classic"
88 payload = f"Hello, {queue_type} Queue!"
89 eta = datetime.now(UTC) + timedelta(seconds=30)
90 if choice == "1.1":
91 result = identity.si(payload).apply_async(queue=queue, eta=eta)
92 else:
93 result = identity.si(payload).apply_async(queue=queue)
94 print()
95 print(f"Task sent with ID: {result.id}")
96 print("Task type: identity")
97
98 if choice == "1.1":
99 print(f"ETA: {eta}")
100
101 print(f"Payload: {payload}")
102
103 elif choice == "2":
104 tasks = [
105 (1, 2),
106 (3, 4),
107 (5, 6),
108 ]
109 result = group(
110 add.s(*tasks[0]),
111 add.s(*tasks[1]),
112 add.s(*tasks[2]),
113 ).apply_async(queue=queue)
114 print()
115 print("Group of tasks sent.")
116 print(f"Group result ID: {result.id}")
117 for i, task_args in enumerate(tasks, 1):
118 print(f"Task {i} type: add")
119 print(f"Payload: {task_args}")
120
121 elif choice == "3":
122 active_queues = app.control.inspect().active_queues()
123 print()
124 print("Active queues:")
125 for worker, queues in active_queues.items():
126 print(f"Worker: {worker}")
127 for q in queues:
128 print(f" - {q['name']}")

Callers

nothing calls this directly

Calls 11

groupClass · 0.90
is_using_quorum_queuesFunction · 0.85
siMethod · 0.80
active_queuesMethod · 0.80
abortMethod · 0.80
nowMethod · 0.45
apply_asyncMethod · 0.45
sMethod · 0.45
inspectMethod · 0.45
itemsMethod · 0.45
shutdownMethod · 0.45

Tested by

no test coverage detected