Provides the core interface to iterate over an asynchronous stream response.
| 211 | |
| 212 | |
| 213 | class AsyncStream(Generic[_T], metaclass=_AsyncStreamMeta): |
| 214 | """Provides the core interface to iterate over an asynchronous stream response.""" |
| 215 | |
| 216 | response: httpx.Response |
| 217 | _options: Optional[FinalRequestOptions] = None |
| 218 | _decoder: SSEDecoder | SSEBytesDecoder |
| 219 | |
| 220 | def __init__( |
| 221 | self, |
| 222 | *, |
| 223 | cast_to: type[_T], |
| 224 | response: httpx.Response, |
| 225 | client: AsyncAnthropic, |
| 226 | options: Optional[FinalRequestOptions] = None, |
| 227 | ) -> None: |
| 228 | self.response = response |
| 229 | self._cast_to = cast_to |
| 230 | self._client = client |
| 231 | self._options = options |
| 232 | self._decoder = client._make_sse_decoder() |
| 233 | self._iterator = self.__stream__() |
| 234 | |
| 235 | async def __anext__(self) -> _T: |
| 236 | return await self._iterator.__anext__() |
| 237 | |
| 238 | async def __aiter__(self) -> AsyncIterator[_T]: |
| 239 | async for item in self._iterator: |
| 240 | yield item |
| 241 | |
| 242 | async def _iter_events(self) -> AsyncIterator[ServerSentEvent]: |
| 243 | async for sse in self._decoder.aiter_bytes(self.response.aiter_bytes()): |
| 244 | yield sse |
| 245 | |
| 246 | @staticmethod |
| 247 | def raw_events(response: httpx.Response) -> AsyncIterator[ServerSentEvent]: |
| 248 | """Iterate the raw Server-Sent Events from `response`, before any JSON |
| 249 | parsing or event-name filtering. |
| 250 | |
| 251 | This reads the response body directly, so the response is consumed. |
| 252 | """ |
| 253 | return SSEDecoder().aiter_bytes(response.aiter_bytes()) |
| 254 | |
| 255 | async def __stream__(self) -> AsyncIterator[_T]: |
| 256 | cast_to = cast(Any, self._cast_to) |
| 257 | response = self.response |
| 258 | process_data = self._client._process_response_data |
| 259 | iterator = self._iter_events() |
| 260 | |
| 261 | try: |
| 262 | async for sse in iterator: |
| 263 | if sse.event == "completion": |
| 264 | yield process_data(data=sse.json(), cast_to=cast_to, response=response) |
| 265 | |
| 266 | if ( |
| 267 | sse.event == "message_start" |
| 268 | or sse.event == "message_delta" |
| 269 | or sse.event == "message_stop" |
| 270 | or sse.event == "content_block_start" |
no outgoing calls