MCPcopy
hub / github.com/benoitc/gunicorn / _execute_on_worker

Method _execute_on_worker

gunicorn/dirty/arbiter.py:523–581  ·  view source on GitHub ↗

Execute request on a specific worker (called by consumer). Handles both regular responses and streaming (chunk-based) responses. For streaming, chunk and end messages are forwarded directly to the client_writer as they arrive from the worker.

(self, worker_pid, request, client_writer)

Source from the content-addressed store, hash-verified

521 self.worker_consumers[worker_pid] = task
522
523 async def _execute_on_worker(self, worker_pid, request, client_writer):
524 """
525 Execute request on a specific worker (called by consumer).
526
527 Handles both regular responses and streaming (chunk-based) responses.
528 For streaming, chunk and end messages are forwarded directly to the
529 client_writer as they arrive from the worker.
530 """
531 request_id = request.get("id", "unknown")
532
533 try:
534 reader, writer = await self._get_worker_connection(worker_pid)
535 await DirtyProtocol.write_message_async(writer, request)
536
537 # Read messages until we get a response, end, or error
538 while True:
539 try:
540 message = await asyncio.wait_for(
541 DirtyProtocol.read_message_async(reader),
542 timeout=self.cfg.dirty_timeout
543 )
544 except asyncio.TimeoutError:
545 response = make_error_response(
546 request_id,
547 DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout)
548 )
549 await DirtyProtocol.write_message_async(client_writer, response)
550 return
551
552 msg_type = message.get("type")
553
554 # Forward chunk messages to client
555 if msg_type == DirtyProtocol.MSG_TYPE_CHUNK:
556 await DirtyProtocol.write_message_async(client_writer, message)
557 continue
558
559 # Forward end message and complete
560 if msg_type == DirtyProtocol.MSG_TYPE_END:
561 await DirtyProtocol.write_message_async(client_writer, message)
562 return
563
564 # Forward response or error and complete
565 if msg_type in (DirtyProtocol.MSG_TYPE_RESPONSE,
566 DirtyProtocol.MSG_TYPE_ERROR):
567 await DirtyProtocol.write_message_async(client_writer, message)
568 return
569
570 # Unknown message type - log and continue
571 self.log.warning("Unknown message type from worker: %s", msg_type)
572
573 except Exception as e:
574 self.log.error("Error executing on worker %s: %s", worker_pid, e)
575 self._close_worker_connection(worker_pid)
576 response = make_error_response(
577 request_id,
578 DirtyWorkerError(f"Worker communication failed: {e}",
579 worker_id=worker_pid)
580 )

Calls 10

DirtyTimeoutErrorClass · 0.85
DirtyWorkerErrorClass · 0.85
make_error_responseFunction · 0.70
getMethod · 0.45
write_message_asyncMethod · 0.45
read_message_asyncMethod · 0.45
warningMethod · 0.45
errorMethod · 0.45