Handle a connection from an HTTP worker. Routes requests to available dirty workers and returns responses. Supports both regular responses and streaming (chunk-based) responses. Also handles stash (shared state) operations.
(self, reader, writer)
| 403 | await self.manage_workers() |
| 404 | |
| 405 | async def handle_client(self, reader, writer): |
| 406 | """ |
| 407 | Handle a connection from an HTTP worker. |
| 408 | |
| 409 | Routes requests to available dirty workers and returns responses. |
| 410 | Supports both regular responses and streaming (chunk-based) responses. |
| 411 | Also handles stash (shared state) operations. |
| 412 | """ |
| 413 | self.log.debug("New client connection from HTTP worker") |
| 414 | |
| 415 | try: |
| 416 | while self.alive: |
| 417 | try: |
| 418 | message = await DirtyProtocol.read_message_async(reader) |
| 419 | except asyncio.IncompleteReadError: |
| 420 | break |
| 421 | |
| 422 | msg_type = message.get("type") |
| 423 | |
| 424 | # Handle stash operations |
| 425 | if msg_type == DirtyProtocol.MSG_TYPE_STASH: |
| 426 | await self.handle_stash_request(message, writer) |
| 427 | # Handle status queries |
| 428 | elif msg_type == DirtyProtocol.MSG_TYPE_STATUS: |
| 429 | await self.handle_status_request(message, writer) |
| 430 | # Handle worker management (add/remove workers) |
| 431 | elif msg_type == DirtyProtocol.MSG_TYPE_MANAGE: |
| 432 | await self.handle_manage_request(message, writer) |
| 433 | else: |
| 434 | # Route request to a dirty worker - pass writer for streaming |
| 435 | await self.route_request(message, writer) |
| 436 | except Exception as e: |
| 437 | self.log.error("Client connection error: %s", e) |
| 438 | finally: |
| 439 | writer.close() |
| 440 | try: |
| 441 | await writer.wait_closed() |
| 442 | except Exception: |
| 443 | pass |
| 444 | |
| 445 | async def route_request(self, request, client_writer): |
| 446 | """ |