MCPcopy
hub / github.com/encode/starlette / CustomAsyncIterator

Class CustomAsyncIterator

tests/test_responses.py:133–144  ·  tests/test_responses.py::CustomAsyncIterator

Source from the content-addressed store, hash-verified

131) -> None:
132 async def app(scope: Scope, receive: Receive, send: Send) -> None:
133 class CustomAsyncIterator:
134 def __init__(self) -> None:
135 self._called = 0
136
137 def __aiter__(self) -> AsyncIterator[str]:
138 return self
139
140 async def __anext__(self) -> str:
141 if self._called == 5:
142 raise StopAsyncIteration()
143 self._called += 1
144 return str(self._called)
145
146 response = StreamingResponse(CustomAsyncIterator(), media_type=class="st">"text/plain")
147 await response(scope, receive, send)

Callers 1

appFunction · 0.85

Calls

no outgoing calls

Tested by 1

appFunction · 0.68