MCPcopy
hub / github.com/encode/httpx / TextChunker

Class TextChunker

httpx/_decoders.py:267–303  ·  view source on GitHub ↗

Handles returning text content in fixed-size chunks.

Source from the content-addressed store, hash-verified

265
266
267class TextChunker:
268 """
269 Handles returning text content in fixed-size chunks.
270 """
271
272 def __init__(self, chunk_size: int | None = None) -> None:
273 self._buffer = io.StringIO()
274 self._chunk_size = chunk_size
275
276 def decode(self, content: str) -> list[str]:
277 if self._chunk_size is None:
278 return [content] if content else []
279
280 self._buffer.write(content)
281 if self._buffer.tell() >= self._chunk_size:
282 value = self._buffer.getvalue()
283 chunks = [
284 value[i : i + self._chunk_size]
285 for i in range(0, len(value), self._chunk_size)
286 ]
287 if len(chunks[-1]) == self._chunk_size:
288 self._buffer.seek(0)
289 self._buffer.truncate()
290 return chunks
291 else:
292 self._buffer.seek(0)
293 self._buffer.write(chunks[-1])
294 self._buffer.truncate()
295 return chunks[:-1]
296 else:
297 return []
298
299 def flush(self) -> list[str]:
300 value = self._buffer.getvalue()
301 self._buffer.seek(0)
302 self._buffer.truncate()
303 return [value] if value else []
304
305
306class TextDecoder:

Callers 2

iter_textMethod · 0.85
aiter_textMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected