Execute request on a specific worker (called by consumer). Handles both regular responses and streaming (chunk-based) responses. For streaming, chunk and end messages are forwarded directly to the client_writer as they arrive from the worker.
(self, worker_pid, request, client_writer)
| 521 | self.worker_consumers[worker_pid] = task |
| 522 | |
| 523 | async def _execute_on_worker(self, worker_pid, request, client_writer): |
| 524 | """ |
| 525 | Execute request on a specific worker (called by consumer). |
| 526 | |
| 527 | Handles both regular responses and streaming (chunk-based) responses. |
| 528 | For streaming, chunk and end messages are forwarded directly to the |
| 529 | client_writer as they arrive from the worker. |
| 530 | """ |
| 531 | request_id = request.get("id", "unknown") |
| 532 | |
| 533 | try: |
| 534 | reader, writer = await self._get_worker_connection(worker_pid) |
| 535 | await DirtyProtocol.write_message_async(writer, request) |
| 536 | |
| 537 | # Read messages until we get a response, end, or error |
| 538 | while True: |
| 539 | try: |
| 540 | message = await asyncio.wait_for( |
| 541 | DirtyProtocol.read_message_async(reader), |
| 542 | timeout=self.cfg.dirty_timeout |
| 543 | ) |
| 544 | except asyncio.TimeoutError: |
| 545 | response = make_error_response( |
| 546 | request_id, |
| 547 | DirtyTimeoutError("Worker timeout", self.cfg.dirty_timeout) |
| 548 | ) |
| 549 | await DirtyProtocol.write_message_async(client_writer, response) |
| 550 | return |
| 551 | |
| 552 | msg_type = message.get("type") |
| 553 | |
| 554 | # Forward chunk messages to client |
| 555 | if msg_type == DirtyProtocol.MSG_TYPE_CHUNK: |
| 556 | await DirtyProtocol.write_message_async(client_writer, message) |
| 557 | continue |
| 558 | |
| 559 | # Forward end message and complete |
| 560 | if msg_type == DirtyProtocol.MSG_TYPE_END: |
| 561 | await DirtyProtocol.write_message_async(client_writer, message) |
| 562 | return |
| 563 | |
| 564 | # Forward response or error and complete |
| 565 | if msg_type in (DirtyProtocol.MSG_TYPE_RESPONSE, |
| 566 | DirtyProtocol.MSG_TYPE_ERROR): |
| 567 | await DirtyProtocol.write_message_async(client_writer, message) |
| 568 | return |
| 569 | |
| 570 | # Unknown message type - log and continue |
| 571 | self.log.warning("Unknown message type from worker: %s", msg_type) |
| 572 | |
| 573 | except Exception as e: |
| 574 | self.log.error("Error executing on worker %s: %s", worker_pid, e) |
| 575 | self._close_worker_connection(worker_pid) |
| 576 | response = make_error_response( |
| 577 | request_id, |
| 578 | DirtyWorkerError(f"Worker communication failed: {e}", |
| 579 | worker_id=worker_pid) |
| 580 | ) |