Provides the core interface to iterate over a synchronous stream response.
| 21 | |
| 22 | |
| 23 | class Stream(Generic[_T]): |
| 24 | """Provides the core interface to iterate over a synchronous stream response.""" |
| 25 | |
| 26 | response: httpx.Response |
| 27 | _options: Optional[FinalRequestOptions] = None |
| 28 | _decoder: SSEBytesDecoder |
| 29 | |
| 30 | def __init__( |
| 31 | self, |
| 32 | *, |
| 33 | cast_to: type[_T], |
| 34 | response: httpx.Response, |
| 35 | client: OpenAI, |
| 36 | options: Optional[FinalRequestOptions] = None, |
| 37 | ) -> None: |
| 38 | self.response = response |
| 39 | self._cast_to = cast_to |
| 40 | self._client = client |
| 41 | self._options = options |
| 42 | self._decoder = client._make_sse_decoder() |
| 43 | self._iterator = self.__stream__() |
| 44 | |
| 45 | def __next__(self) -> _T: |
| 46 | return self._iterator.__next__() |
| 47 | |
| 48 | def __iter__(self) -> Iterator[_T]: |
| 49 | for item in self._iterator: |
| 50 | yield item |
| 51 | |
| 52 | def _iter_events(self) -> Iterator[ServerSentEvent]: |
| 53 | yield from self._decoder.iter_bytes(self.response.iter_bytes()) |
| 54 | |
| 55 | def __stream__(self) -> Iterator[_T]: |
| 56 | cast_to = cast(Any, self._cast_to) |
| 57 | response = self.response |
| 58 | process_data = self._client._process_response_data |
| 59 | iterator = self._iter_events() |
| 60 | |
| 61 | try: |
| 62 | for sse in iterator: |
| 63 | if sse.data.startswith("[DONE]"): |
| 64 | break |
| 65 | |
| 66 | # we have to special case the Assistants `thread.` events since we won't have an "event" key in the data |
| 67 | if sse.event and sse.event.startswith("thread."): |
| 68 | data = sse.json() |
| 69 | |
| 70 | if sse.event == "error" and is_mapping(data) and data.get("error"): |
| 71 | message = None |
| 72 | error = data.get("error") |
| 73 | if is_mapping(error): |
| 74 | message = error.get("message") |
| 75 | if not message or not isinstance(message, str): |
| 76 | message = "An error occurred during streaming" |
| 77 | |
| 78 | raise APIError( |
| 79 | message=message, |
| 80 | request=self.response.request, |
no outgoing calls