MCPcopy
hub / github.com/fastapi/fastapi / _sse_producer_cm

Function _sse_producer_cm

fastapi/routing.py:551–600  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

549
550 @asynccontextmanager
551 async def _sse_producer_cm() -> AsyncIterator[
552 ObjectReceiveStream[bytes]
553 ]:
554 # Use a memory stream to decouple generator iteration
555 # from the keepalive timer. A producer task pulls items
556 # from the generator independently, so
557 # `anyio.fail_after` never wraps the generator's
558 # `__anext__` directly - avoiding CancelledError that
559 # would finalize the generator and also working for sync
560 # generators running in a thread pool.
561 #
562 # This context manager is entered on the request-scoped
563 # AsyncExitStack so its __aexit__ (which cancels the
564 # task group) is called by the exit stack after the
565 # streaming response completes — not by async generator
566 # finalization via GeneratorExit.
567 # Ref: https://peps.python.org/pep-0789/
568 send_stream, receive_stream = anyio.create_memory_object_stream[
569 bytes
570 ](max_buffer_size=1)
571
572 async def _producer() -> None:
573 async with send_stream:
574 async for raw_item in sse_aiter:
575 await send_stream.send(_serialize_sse_item(raw_item))
576
577 send_keepalive, receive_keepalive = (
578 anyio.create_memory_object_stream[bytes](max_buffer_size=1)
579 )
580
581 async def _keepalive_inserter() -> None:
582 """Read from the producer and forward to the output,
583 inserting keepalive comments on timeout."""
584 async with send_keepalive, receive_stream:
585 try:
586 while True:
587 try:
588 with anyio.fail_after(_PING_INTERVAL):
589 data = await receive_stream.receive()
590 await send_keepalive.send(data)
591 except TimeoutError:
592 await send_keepalive.send(KEEPALIVE_COMMENT)
593 except anyio.EndOfStream:
594 pass
595
596 async with anyio.create_task_group() as tg:
597 tg.start_soon(_producer)
598 tg.start_soon(_keepalive_inserter)
599 yield receive_keepalive
600 tg.cancel_scope.cancel()
601
602 # Enter the SSE context manager on the request-scoped
603 # exit stack. The stack outlives the streaming response,

Callers 1

appFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…