MCPcopy
hub / github.com/celery/celery / run_worker

Function run_worker

t/integration/test_quorum_queue_qos_cluster_simulation.py:56–81  ·  view source on GitHub ↗
(simulate_qos_issue: bool, result_queue: multiprocessing.Queue)

Source from the content-addressed store, hash-verified

54
55
56def run_worker(simulate_qos_issue: bool, result_queue: multiprocessing.Queue):
57 queue_name = f"race_quorum_queue_{uuid.uuid4().hex}"
58 app = create_app(queue_name)
59 logger.info("[Celery config snapshot]:\n%s", pprint.pformat(dict(app.conf)))
60 task = dummy_task_factory(app, simulate_qos_issue)
61
62 try:
63 with start_worker(
64 app,
65 queues=[queue_name],
66 loglevel="INFO",
67 perform_ping_check=False,
68 shutdown_timeout=15,
69 ):
70 res = task.delay()
71 try:
72 result = res.get(timeout=10)
73 result_queue.put({"status": "ok", "result": result})
74 except Exception as e:
75 result_queue.put({"status": "error", "reason": str(e)})
76 except Exception as e:
77 logger.exception("[worker %s] external failure", simulate_qos_issue)
78 result_queue.put({"status": "external_failure", "reason": str(e)})
79 finally:
80 if result_queue.empty():
81 result_queue.put({"status": "crash", "reason": "Worker crashed without reporting"})
82
83
84@pytest.mark.amqp

Callers

nothing calls this directly

Calls 8

start_workerFunction · 0.90
create_appFunction · 0.85
dummy_task_factoryFunction · 0.85
emptyMethod · 0.80
infoMethod · 0.45
delayMethod · 0.45
getMethod · 0.45
putMethod · 0.45

Tested by

no test coverage detected