MCPcopy
hub / github.com/anthropics/anthropic-sdk-python / make_event_iterator

Function make_event_iterator

tests/test_streaming.py:266–278  ·  view source on GitHub ↗
(
    content: Iterator[bytes],
    *,
    sync: bool,
    client: Anthropic,
    async_client: AsyncAnthropic,
)

Source from the content-addressed store, hash-verified

264 await iter_next(iter)
265
266
267def make_event_iterator(
268 content: Iterator[bytes],
269 *,
270 sync: bool,
271 client: Anthropic,
272 async_client: AsyncAnthropic,
273) -> Iterator[ServerSentEvent] | AsyncIterator[ServerSentEvent]:
274 if sync:
275 return Stream(cast_to=object, client=client, response=httpx.Response(200, content=content))._iter_events()
276
277 return AsyncStream(
278 cast_to=object, client=async_client, response=httpx.Response(200, content=to_aiter(content))
279 )._iter_events()
280
281

Calls 4

StreamClass · 0.90
AsyncStreamClass · 0.90
to_aiterFunction · 0.70
_iter_eventsMethod · 0.45

Tested by

no test coverage detected