Provides the core interface to iterate over an asynchronous stream response.
| 130 | |
| 131 | |
| 132 | class AsyncStream(Generic[_T]): |
| 133 | """Provides the core interface to iterate over an asynchronous stream response.""" |
| 134 | |
| 135 | response: httpx.Response |
| 136 | _options: Optional[FinalRequestOptions] = None |
| 137 | _decoder: SSEDecoder | SSEBytesDecoder |
| 138 | |
| 139 | def __init__( |
| 140 | self, |
| 141 | *, |
| 142 | cast_to: type[_T], |
| 143 | response: httpx.Response, |
| 144 | client: AsyncOpenAI, |
| 145 | options: Optional[FinalRequestOptions] = None, |
| 146 | ) -> None: |
| 147 | self.response = response |
| 148 | self._cast_to = cast_to |
| 149 | self._client = client |
| 150 | self._options = options |
| 151 | self._decoder = client._make_sse_decoder() |
| 152 | self._iterator = self.__stream__() |
| 153 | |
| 154 | async def __anext__(self) -> _T: |
| 155 | return await self._iterator.__anext__() |
| 156 | |
| 157 | async def __aiter__(self) -> AsyncIterator[_T]: |
| 158 | async for item in self._iterator: |
| 159 | yield item |
| 160 | |
| 161 | async def _iter_events(self) -> AsyncIterator[ServerSentEvent]: |
| 162 | async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()): |
| 163 | yield sse |
| 164 | |
| 165 | async def __stream__(self) -> AsyncIterator[_T]: |
| 166 | cast_to = cast(Any, self._cast_to) |
| 167 | response = self.response |
| 168 | process_data = self._client._process_response_data |
| 169 | iterator = self._iter_events() |
| 170 | |
| 171 | try: |
| 172 | async for sse in iterator: |
| 173 | if sse.data.startswith("[DONE]"): |
| 174 | break |
| 175 | |
| 176 | # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data |
| 177 | if sse.event and sse.event.startswith("thread."): |
| 178 | data = sse.json() |
| 179 | |
| 180 | if sse.event == "error" and is_mapping(data) and data.get("error"): |
| 181 | message = None |
| 182 | error = data.get("error") |
| 183 | if is_mapping(error): |
| 184 | message = error.get("message") |
| 185 | if not message or not isinstance(message, str): |
| 186 | message = "An error occurred during streaming" |
| 187 | |
| 188 | raise APIError( |
| 189 | message=message, |
no outgoing calls