Handle a single request message. Supports both regular (non-streaming) and streaming responses. For streaming, detects if the result is a generator and sends chunk messages followed by an end message. Args: message: Request dict from protocol
(self, message, writer)
| 313 | pass |
| 314 | |
| 315 | async def handle_request(self, message, writer): |
| 316 | """ |
| 317 | Handle a single request message. |
| 318 | |
| 319 | Supports both regular (non-streaming) and streaming responses. |
| 320 | For streaming, detects if the result is a generator and sends |
| 321 | chunk messages followed by an end message. |
| 322 | |
| 323 | Args: |
| 324 | message: Request dict from protocol |
| 325 | writer: StreamWriter for sending responses |
| 326 | """ |
| 327 | request_id = message.get("id", str(uuid.uuid4())) |
| 328 | msg_type = message.get("type") |
| 329 | |
| 330 | if msg_type != DirtyProtocol.MSG_TYPE_REQUEST: |
| 331 | response = make_error_response( |
| 332 | request_id, |
| 333 | DirtyWorkerError(f"Unknown message type: {msg_type}") |
| 334 | ) |
| 335 | await DirtyProtocol.write_message_async(writer, response) |
| 336 | return |
| 337 | |
| 338 | app_path = message.get("app_path") |
| 339 | action = message.get("action") |
| 340 | args = message.get("args", []) |
| 341 | kwargs = message.get("kwargs", {}) |
| 342 | |
| 343 | # Update heartbeat before executing |
| 344 | self.notify() |
| 345 | |
| 346 | try: |
| 347 | result = await self.execute(app_path, action, args, kwargs) |
| 348 | |
| 349 | # Check if result is a generator (streaming) |
| 350 | if inspect.isgenerator(result): |
| 351 | await self._stream_sync_generator(request_id, result, writer) |
| 352 | elif inspect.isasyncgen(result): |
| 353 | await self._stream_async_generator(request_id, result, writer) |
| 354 | else: |
| 355 | # Regular non-streaming response |
| 356 | response = make_response(request_id, result) |
| 357 | await DirtyProtocol.write_message_async(writer, response) |
| 358 | except Exception as e: |
| 359 | tb = traceback.format_exc() |
| 360 | self.log.error("Error executing %s.%s: %s\n%s", |
| 361 | app_path, action, e, tb) |
| 362 | response = make_error_response( |
| 363 | request_id, |
| 364 | DirtyAppError(str(e), app_path=app_path, action=action, |
| 365 | traceback=tb) |
| 366 | ) |
| 367 | await DirtyProtocol.write_message_async(writer, response) |
| 368 | |
| 369 | async def _stream_sync_generator(self, request_id, gen, writer): |
| 370 | """ |