MCPcopy
hub / github.com/encode/httpx / ByteChunker

Class ByteChunker

httpx/_decoders.py:228–264  ·  view source on GitHub ↗

Handles returning byte content in fixed-size chunks.

Source from the content-addressed store, hash-verified

226
227
228class ByteChunker:
229 """
230 Handles returning byte content in fixed-size chunks.
231 """
232
233 def __init__(self, chunk_size: int | None = None) -> None:
234 self._buffer = io.BytesIO()
235 self._chunk_size = chunk_size
236
237 def decode(self, content: bytes) -> list[bytes]:
238 if self._chunk_size is None:
239 return [content] if content else []
240
241 self._buffer.write(content)
242 if self._buffer.tell() >= self._chunk_size:
243 value = self._buffer.getvalue()
244 chunks = [
245 value[i : i + self._chunk_size]
246 for i in range(0, len(value), self._chunk_size)
247 ]
248 if len(chunks[-1]) == self._chunk_size:
249 self._buffer.seek(0)
250 self._buffer.truncate()
251 return chunks
252 else:
253 self._buffer.seek(0)
254 self._buffer.write(chunks[-1])
255 self._buffer.truncate()
256 return chunks[:-1]
257 else:
258 return []
259
260 def flush(self) -> list[bytes]:
261 value = self._buffer.getvalue()
262 self._buffer.seek(0)
263 self._buffer.truncate()
264 return [value] if value else []
265
266
267class TextChunker:

Callers 4

iter_bytesMethod · 0.85
iter_rawMethod · 0.85
aiter_bytesMethod · 0.85
aiter_rawMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected