(self)
| 163 | yield sse |
| 164 | |
| 165 | async def __stream__(self) -> AsyncIterator[_T]: |
| 166 | cast_to = cast(Any, self._cast_to) |
| 167 | response = self.response |
| 168 | process_data = self._client._process_response_data |
| 169 | iterator = self._iter_events() |
| 170 | |
| 171 | try: |
| 172 | async for sse in iterator: |
| 173 | if sse.data.startswith("[DONE]"): |
| 174 | break |
| 175 | |
| 176 | # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data |
| 177 | if sse.event and sse.event.startswith("thread."): |
| 178 | data = sse.json() |
| 179 | |
| 180 | if sse.event == "error" and is_mapping(data) and data.get("error"): |
| 181 | message = None |
| 182 | error = data.get("error") |
| 183 | if is_mapping(error): |
| 184 | message = error.get("message") |
| 185 | if not message or not isinstance(message, str): |
| 186 | message = "An error occurred during streaming" |
| 187 | |
| 188 | raise APIError( |
| 189 | message=message, |
| 190 | request=self.response.request, |
| 191 | body=data["error"], |
| 192 | ) |
| 193 | |
| 194 | yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) |
| 195 | else: |
| 196 | data = sse.json() |
| 197 | if is_mapping(data) and data.get("error"): |
| 198 | message = None |
| 199 | error = data.get("error") |
| 200 | if is_mapping(error): |
| 201 | message = error.get("message") |
| 202 | if not message or not isinstance(message, str): |
| 203 | message = "An error occurred during streaming" |
| 204 | |
| 205 | raise APIError( |
| 206 | message=message, |
| 207 | request=self.response.request, |
| 208 | body=data["error"], |
| 209 | ) |
| 210 | |
| 211 | yield process_data( |
| 212 | data={"data": data, "event": sse.event} |
| 213 | if self._options is not None and self._options.synthesize_event_and_data |
| 214 | else data, |
| 215 | cast_to=cast_to, |
| 216 | response=response, |
| 217 | ) |
| 218 | finally: |
| 219 | # Ensure the response is closed even if the consumer doesn't read all data |
| 220 | await response.aclose() |
| 221 | |
| 222 | async def __aenter__(self) -> Self: |
no test coverage detected