Stream chunks from an asynchronous generator. Args: request_id: Request ID for the messages gen: Async generator to iterate writer: StreamWriter for sending messages
(self, request_id, gen, writer)
| 415 | gen.close() |
| 416 | |
| 417 | async def _stream_async_generator(self, request_id, gen, writer): |
| 418 | """ |
| 419 | Stream chunks from an asynchronous generator. |
| 420 | |
| 421 | Args: |
| 422 | request_id: Request ID for the messages |
| 423 | gen: Async generator to iterate |
| 424 | writer: StreamWriter for sending messages |
| 425 | """ |
| 426 | try: |
| 427 | async for chunk in gen: |
| 428 | # Send chunk message |
| 429 | await DirtyProtocol.write_message_async( |
| 430 | writer, make_chunk_message(request_id, chunk) |
| 431 | ) |
| 432 | # Update heartbeat during long streams |
| 433 | self.notify() |
| 434 | # Send end message |
| 435 | await DirtyProtocol.write_message_async( |
| 436 | writer, make_end_message(request_id) |
| 437 | ) |
| 438 | except Exception as e: |
| 439 | # Error during streaming - send error message |
| 440 | tb = traceback.format_exc() |
| 441 | self.log.error("Error during streaming: %s\n%s", e, tb) |
| 442 | response = make_error_response( |
| 443 | request_id, |
| 444 | DirtyAppError(str(e), traceback=tb) |
| 445 | ) |
| 446 | await DirtyProtocol.write_message_async(writer, response) |
| 447 | finally: |
| 448 | await gen.aclose() |
| 449 | |
| 450 | async def execute(self, app_path, action, args, kwargs): |
| 451 | """ |
no test coverage detected