Stream chunks from a synchronous generator. Args: request_id: Request ID for the messages gen: Sync generator to iterate writer: StreamWriter for sending messages
(self, request_id, gen, writer)
| 367 | await DirtyProtocol.write_message_async(writer, response) |
| 368 | |
| 369 | async def _stream_sync_generator(self, request_id, gen, writer): |
| 370 | """ |
| 371 | Stream chunks from a synchronous generator. |
| 372 | |
| 373 | Args: |
| 374 | request_id: Request ID for the messages |
| 375 | gen: Sync generator to iterate |
| 376 | writer: StreamWriter for sending messages |
| 377 | """ |
| 378 | # Sentinel value to detect end of generator |
| 379 | # (StopIteration cannot be raised into a Future in Python 3.7+) |
| 380 | _EXHAUSTED = object() |
| 381 | |
| 382 | def _get_next(): |
| 383 | try: |
| 384 | return next(gen) |
| 385 | except StopIteration: |
| 386 | return _EXHAUSTED |
| 387 | |
| 388 | try: |
| 389 | loop = asyncio.get_running_loop() |
| 390 | while True: |
| 391 | # Run next() in executor to avoid blocking event loop |
| 392 | chunk = await loop.run_in_executor(self._executor, _get_next) |
| 393 | if chunk is _EXHAUSTED: |
| 394 | break |
| 395 | # Send chunk message |
| 396 | await DirtyProtocol.write_message_async( |
| 397 | writer, make_chunk_message(request_id, chunk) |
| 398 | ) |
| 399 | # Update heartbeat during long streams |
| 400 | self.notify() |
| 401 | # Send end message |
| 402 | await DirtyProtocol.write_message_async( |
| 403 | writer, make_end_message(request_id) |
| 404 | ) |
| 405 | except Exception as e: |
| 406 | # Error during streaming - send error message |
| 407 | tb = traceback.format_exc() |
| 408 | self.log.error("Error during streaming: %s\n%s", e, tb) |
| 409 | response = make_error_response( |
| 410 | request_id, |
| 411 | DirtyAppError(str(e), traceback=tb) |
| 412 | ) |
| 413 | await DirtyProtocol.write_message_async(writer, response) |
| 414 | finally: |
| 415 | gen.close() |
| 416 | |
| 417 | async def _stream_async_generator(self, request_id, gen, writer): |
| 418 | """ |
no test coverage detected