MCPcopy
hub / github.com/openai/openai-python / _iter_chunks

Method _iter_chunks

src/openai/_streaming.py:305–315  ·  view source on GitHub ↗

Given an iterator that yields raw binary data, iterate over it and yield individual SSE chunks

(self, iterator: Iterator[bytes])

Source from the content-addressed store, hash-verified

303 yield sse
304
305 def _iter_chunks(self, iterator: Iterator[bytes]) -> Iterator[bytes]:
306 """Given an iterator that yields raw binary data, iterate over it and yield individual SSE chunks"""
307 data = b""
308 for chunk in iterator:
309 for line in chunk.splitlines(keepends=True):
310 data += line
311 if data.endswith((b"\r\r", b"\n\n", b"\r\n\r\n")):
312 yield data
313 data = b""
314 if data:
315 yield data
316
317 async def aiter_bytes(self, iterator: AsyncIterator[bytes]) -> AsyncIterator[ServerSentEvent]:
318 """Given an iterator that yields raw binary data, iterate over it & yield every event encountered"""

Callers 1

iter_bytesMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected