()
| 549 | |
| 550 | @asynccontextmanager |
| 551 | async def _sse_producer_cm() -> AsyncIterator[ |
| 552 | ObjectReceiveStream[bytes] |
| 553 | ]: |
| 554 | # Use a memory stream to decouple generator iteration |
| 555 | # from the keepalive timer. A producer task pulls items |
| 556 | # from the generator independently, so |
| 557 | # `anyio.fail_after` never wraps the generator's |
| 558 | # `__anext__` directly - avoiding CancelledError that |
| 559 | # would finalize the generator and also working for sync |
| 560 | # generators running in a thread pool. |
| 561 | # |
| 562 | # This context manager is entered on the request-scoped |
| 563 | # AsyncExitStack so its __aexit__ (which cancels the |
| 564 | # task group) is called by the exit stack after the |
| 565 | # streaming response completes — not by async generator |
| 566 | # finalization via GeneratorExit. |
| 567 | # Ref: https://peps.python.org/pep-0789/ |
| 568 | send_stream, receive_stream = anyio.create_memory_object_stream[ |
| 569 | bytes |
| 570 | ](max_buffer_size=1) |
| 571 | |
| 572 | async def _producer() -> None: |
| 573 | async with send_stream: |
| 574 | async for raw_item in sse_aiter: |
| 575 | await send_stream.send(_serialize_sse_item(raw_item)) |
| 576 | |
| 577 | send_keepalive, receive_keepalive = ( |
| 578 | anyio.create_memory_object_stream[bytes](max_buffer_size=1) |
| 579 | ) |
| 580 | |
| 581 | async def _keepalive_inserter() -> None: |
| 582 | """Read from the producer and forward to the output, |
| 583 | inserting keepalive comments on timeout.""" |
| 584 | async with send_keepalive, receive_stream: |
| 585 | try: |
| 586 | while True: |
| 587 | try: |
| 588 | with anyio.fail_after(_PING_INTERVAL): |
| 589 | data = await receive_stream.receive() |
| 590 | await send_keepalive.send(data) |
| 591 | except TimeoutError: |
| 592 | await send_keepalive.send(KEEPALIVE_COMMENT) |
| 593 | except anyio.EndOfStream: |
| 594 | pass |
| 595 | |
| 596 | async with anyio.create_task_group() as tg: |
| 597 | tg.start_soon(_producer) |
| 598 | tg.start_soon(_keepalive_inserter) |
| 599 | yield receive_keepalive |
| 600 | tg.cancel_scope.cancel() |
| 601 | |
| 602 | # Enter the SSE context manager on the request-scoped |
| 603 | # exit stack. The stack outlives the streaming response, |
no outgoing calls
no test coverage detected
searching dependent graphs…