MCPcopy
hub / github.com/benoitc/gunicorn / route_request

Method route_request

gunicorn/dirty/arbiter.py:445–495  ·  view source on GitHub ↗

Route a request to an available dirty worker via queue. Each worker has a dedicated queue and consumer task. Requests are submitted to the queue and processed sequentially by the consumer. For streaming responses, messages (chunks) are forwarded directly to

(self, request, client_writer)

Source from the content-addressed store, hash-verified

443 pass
444
445 async def route_request(self, request, client_writer):
446 """
447 Route a request to an available dirty worker via queue.
448
449 Each worker has a dedicated queue and consumer task. Requests are
450 submitted to the queue and processed sequentially by the consumer.
451
452 For streaming responses, messages (chunks) are forwarded directly
453 to the client_writer as they arrive from the worker.
454
455 Args:
456 request: Request message dict
457 client_writer: StreamWriter to send responses to client
458 """
459 request_id = request.get("id", "unknown")
460 app_path = request.get("app_path")
461
462 # Find an available worker (filtered by app if specified)
463 worker_pid = await self._get_available_worker(app_path)
464 if worker_pid is None:
465 # Distinguish between no workers at all vs. no workers for this app
466 if not self.workers:
467 error = DirtyError("No dirty workers available")
468 elif app_path and self.app_specs:
469 # Per-app allocation is configured and no workers have this app
470 error = DirtyNoWorkersAvailableError(app_path)
471 else:
472 error = DirtyError("No dirty workers available")
473 response = make_error_response(request_id, error)
474 await DirtyProtocol.write_message_async(client_writer, response)
475 return
476
477 # Get queue (start consumer if needed)
478 if worker_pid not in self.worker_queues:
479 await self._start_worker_consumer(worker_pid)
480
481 queue = self.worker_queues[worker_pid]
482 future = asyncio.get_running_loop().create_future()
483
484 # Submit request to queue with client writer for streaming support
485 await queue.put((request, client_writer, future))
486
487 # Wait for completion (streaming messages forwarded by consumer)
488 try:
489 await future
490 except Exception as e:
491 response = make_error_response(
492 request_id,
493 DirtyWorkerError(f"Request failed: {e}", worker_id=worker_pid)
494 )
495 await DirtyProtocol.write_message_async(client_writer, response)
496
497 async def _start_worker_consumer(self, worker_pid):
498 """Start a consumer task for a worker's request queue."""

Calls 9

_get_available_workerMethod · 0.95
DirtyErrorClass · 0.85
DirtyWorkerErrorClass · 0.85
putMethod · 0.80
make_error_responseFunction · 0.70
getMethod · 0.45
write_message_asyncMethod · 0.45