()
| 69 | |
| 70 | |
| 71 | def example(): |
| 72 | queue = my_quorum_queue.name if my_quorum_queue in (app.conf.task_queues or {}) else "celery" |
| 73 | |
| 74 | while True: |
| 75 | print("Celery Quorum Queue Example") |
| 76 | print("===========================") |
| 77 | print("1. Send a simple identity task") |
| 78 | print("1.1 Send an ETA identity task") |
| 79 | print("2. Send a group of add tasks") |
| 80 | print("3. Inspect the active queues") |
| 81 | print("4. Shutdown Celery worker") |
| 82 | print("Q. Quit") |
| 83 | print("Q! Exit") |
| 84 | choice = input("Enter your choice (1-4 or Q): ") |
| 85 | |
| 86 | if choice == "1" or choice == "1.1": |
| 87 | queue_type = "Quorum" if is_using_quorum_queues(app) else "Classic" |
| 88 | payload = f"Hello, {queue_type} Queue!" |
| 89 | eta = datetime.now(UTC) + timedelta(seconds=30) |
| 90 | if choice == "1.1": |
| 91 | result = identity.si(payload).apply_async(queue=queue, eta=eta) |
| 92 | else: |
| 93 | result = identity.si(payload).apply_async(queue=queue) |
| 94 | print() |
| 95 | print(f"Task sent with ID: {result.id}") |
| 96 | print("Task type: identity") |
| 97 | |
| 98 | if choice == "1.1": |
| 99 | print(f"ETA: {eta}") |
| 100 | |
| 101 | print(f"Payload: {payload}") |
| 102 | |
| 103 | elif choice == "2": |
| 104 | tasks = [ |
| 105 | (1, 2), |
| 106 | (3, 4), |
| 107 | (5, 6), |
| 108 | ] |
| 109 | result = group( |
| 110 | add.s(*tasks[0]), |
| 111 | add.s(*tasks[1]), |
| 112 | add.s(*tasks[2]), |
| 113 | ).apply_async(queue=queue) |
| 114 | print() |
| 115 | print("Group of tasks sent.") |
| 116 | print(f"Group result ID: {result.id}") |
| 117 | for i, task_args in enumerate(tasks, 1): |
| 118 | print(f"Task {i} type: add") |
| 119 | print(f"Payload: {task_args}") |
| 120 | |
| 121 | elif choice == "3": |
| 122 | active_queues = app.control.inspect().active_queues() |
| 123 | print() |
| 124 | print("Active queues:") |
| 125 | for worker, queues in active_queues.items(): |
| 126 | print(f"Worker: {worker}") |
| 127 | for q in queues: |
| 128 | print(f" - {q['name']}") |
nothing calls this directly
no test coverage detected