MCPcopy
hub / github.com/openai/openai-python / iter_bytes

Method iter_bytes

src/openai/_streaming.py:295–303  ·  view source on GitHub ↗

Given an iterator that yields raw binary data, iterate over it & yield every event encountered

(self, iterator: Iterator[bytes])

Source from the content-addressed store, hash-verified

293 self._retry = None
294
295 def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]:
296 """Given an iterator that yields raw binary data, iterate over it & yield every event encountered"""
297 for chunk in self._iter_chunks(iterator):
298 # Split before decoding so splitlines() only uses \r and \n
299 for raw_line in chunk.splitlines():
300 line = raw_line.decode("utf-8")
301 sse = self.decode(line)
302 if sse:
303 yield sse
304
305 def _iter_chunks(self, iterator: Iterator[bytes]) -> Iterator[bytes]:
306 """Given an iterator that yields raw binary data, iterate over it and yield individual SSE chunks"""

Callers 1

_iter_eventsMethod · 0.45

Calls 2

_iter_chunksMethod · 0.95
decodeMethod · 0.95

Tested by

no test coverage detected