(simulate_qos_issue: bool, result_queue: multiprocessing.Queue)
| 54 | |
| 55 | |
| 56 | def run_worker(simulate_qos_issue: bool, result_queue: multiprocessing.Queue): |
| 57 | queue_name = f"race_quorum_queue_{uuid.uuid4().hex}" |
| 58 | app = create_app(queue_name) |
| 59 | logger.info("[Celery config snapshot]:\n%s", pprint.pformat(dict(app.conf))) |
| 60 | task = dummy_task_factory(app, simulate_qos_issue) |
| 61 | |
| 62 | try: |
| 63 | with start_worker( |
| 64 | app, |
| 65 | queues=[queue_name], |
| 66 | loglevel="INFO", |
| 67 | perform_ping_check=False, |
| 68 | shutdown_timeout=15, |
| 69 | ): |
| 70 | res = task.delay() |
| 71 | try: |
| 72 | result = res.get(timeout=10) |
| 73 | result_queue.put({"status": "ok", "result": result}) |
| 74 | except Exception as e: |
| 75 | result_queue.put({"status": "error", "reason": str(e)}) |
| 76 | except Exception as e: |
| 77 | logger.exception("[worker %s] external failure", simulate_qos_issue) |
| 78 | result_queue.put({"status": "external_failure", "reason": str(e)}) |
| 79 | finally: |
| 80 | if result_queue.empty(): |
| 81 | result_queue.put({"status": "crash", "reason": "Worker crashed without reporting"}) |
| 82 | |
| 83 | |
| 84 | @pytest.mark.amqp |
nothing calls this directly
no test coverage detected