MCPcopy
hub / github.com/openai/openai-python / AsyncStream

Class AsyncStream

src/openai/_streaming.py:132–239  ·  view source on GitHub ↗

Provides the core interface to iterate over an asynchronous stream response.

Source from the content-addressed store, hash-verified

130
131
132class AsyncStream(Generic[_T]):
133 """Provides the core interface to iterate over an asynchronous stream response."""
134
135 response: httpx.Response
136 _options: Optional[FinalRequestOptions] = None
137 _decoder: SSEDecoder | SSEBytesDecoder
138
139 def __init__(
140 self,
141 *,
142 cast_to: type[_T],
143 response: httpx.Response,
144 client: AsyncOpenAI,
145 options: Optional[FinalRequestOptions] = None,
146 ) -> None:
147 self.response = response
148 self._cast_to = cast_to
149 self._client = client
150 self._options = options
151 self._decoder = client._make_sse_decoder()
152 self._iterator = self.__stream__()
153
154 async def __anext__(self) -> _T:
155 return await self._iterator.__anext__()
156
157 async def __aiter__(self) -> AsyncIterator[_T]:
158 async for item in self._iterator:
159 yield item
160
161 async def _iter_events(self) -> AsyncIterator[ServerSentEvent]:
162 async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()):
163 yield sse
164
165 async def __stream__(self) -> AsyncIterator[_T]:
166 cast_to = cast(Any, self._cast_to)
167 response = self.response
168 process_data = self._client._process_response_data
169 iterator = self._iter_events()
170
171 try:
172 async for sse in iterator:
173 if sse.data.startswith("[DONE]"):
174 break
175
176 # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data
177 if sse.event and sse.event.startswith("thread."):
178 data = sse.json()
179
180 if sse.event == "error" and is_mapping(data) and data.get("error"):
181 message = None
182 error = data.get("error")
183 if is_mapping(error):
184 message = error.get("message")
185 if not message or not isinstance(message, str):
186 message = "An error occurred during streaming"
187
188 raise APIError(
189 message=message,

Callers 1

make_event_iteratorFunction · 0.90

Calls

no outgoing calls

Tested by 1

make_event_iteratorFunction · 0.72