Given an iterator that yields raw binary data, iterate over it & yield every event encountered
(self, iterator: Iterator[bytes])
| 293 | self._retry = None |
| 294 | |
| 295 | def iter_bytes(self, iterator: Iterator[bytes]) -> Iterator[ServerSentEvent]: |
| 296 | """Given an iterator that yields raw binary data, iterate over it & yield every event encountered""" |
| 297 | for chunk in self._iter_chunks(iterator): |
| 298 | # Split before decoding so splitlines() only uses \r and \n |
| 299 | for raw_line in chunk.splitlines(): |
| 300 | line = raw_line.decode("utf-8") |
| 301 | sse = self.decode(line) |
| 302 | if sse: |
| 303 | yield sse |
| 304 | |
| 305 | def _iter_chunks(self, iterator: Iterator[bytes]) -> Iterator[bytes]: |
| 306 | """Given an iterator that yields raw binary data, iterate over it and yield individual SSE chunks""" |
no test coverage detected