Route a request to an available dirty worker via queue. Each worker has a dedicated queue and consumer task. Requests are submitted to the queue and processed sequentially by the consumer. For streaming responses, messages (chunks) are forwarded directly to
(self, request, client_writer)
| 443 | pass |
| 444 | |
| 445 | async def route_request(self, request, client_writer): |
| 446 | """ |
| 447 | Route a request to an available dirty worker via queue. |
| 448 | |
| 449 | Each worker has a dedicated queue and consumer task. Requests are |
| 450 | submitted to the queue and processed sequentially by the consumer. |
| 451 | |
| 452 | For streaming responses, messages (chunks) are forwarded directly |
| 453 | to the client_writer as they arrive from the worker. |
| 454 | |
| 455 | Args: |
| 456 | request: Request message dict |
| 457 | client_writer: StreamWriter to send responses to client |
| 458 | """ |
| 459 | request_id = request.get("id", "unknown") |
| 460 | app_path = request.get("app_path") |
| 461 | |
| 462 | # Find an available worker (filtered by app if specified) |
| 463 | worker_pid = await self._get_available_worker(app_path) |
| 464 | if worker_pid is None: |
| 465 | # Distinguish between no workers at all vs. no workers for this app |
| 466 | if not self.workers: |
| 467 | error = DirtyError("No dirty workers available") |
| 468 | elif app_path and self.app_specs: |
| 469 | # Per-app allocation is configured and no workers have this app |
| 470 | error = DirtyNoWorkersAvailableError(app_path) |
| 471 | else: |
| 472 | error = DirtyError("No dirty workers available") |
| 473 | response = make_error_response(request_id, error) |
| 474 | await DirtyProtocol.write_message_async(client_writer, response) |
| 475 | return |
| 476 | |
| 477 | # Get queue (start consumer if needed) |
| 478 | if worker_pid not in self.worker_queues: |
| 479 | await self._start_worker_consumer(worker_pid) |
| 480 | |
| 481 | queue = self.worker_queues[worker_pid] |
| 482 | future = asyncio.get_running_loop().create_future() |
| 483 | |
| 484 | # Submit request to queue with client writer for streaming support |
| 485 | await queue.put((request, client_writer, future)) |
| 486 | |
| 487 | # Wait for completion (streaming messages forwarded by consumer) |
| 488 | try: |
| 489 | await future |
| 490 | except Exception as e: |
| 491 | response = make_error_response( |
| 492 | request_id, |
| 493 | DirtyWorkerError(f"Request failed: {e}", worker_id=worker_pid) |
| 494 | ) |
| 495 | await DirtyProtocol.write_message_async(client_writer, response) |
| 496 | |
| 497 | async def _start_worker_consumer(self, worker_pid): |
| 498 | """Start a consumer task for a worker's request queue.""" |