MCPcopy
hub / github.com/encode/starlette / stream_indefinitely

Function stream_indefinitely

tests/test_responses.py:622–626  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

620 disconnected.set()
621
622 async def stream_indefinitely() -> AsyncIterator[bytes]:
623 while True:
624 # Need a sleep for the event loop to switch to another task
625 await anyio.sleep(0)
626 yield b"chunk "
627
628 response = StreamingResponse(content=stream_indefinitely())
629

Calls

no outgoing calls

Tested by

no test coverage detected