MCPcopy
hub / github.com/encode/httpx / decode

Method decode

httpx/_decoders.py:237–258  ·  view source on GitHub ↗
(self, content: bytes)

Source from the content-addressed store, hash-verified

235 self._chunk_size = chunk_size
236
237 def decode(self, content: bytes) -> list[bytes]:
238 if self._chunk_size is None:
239 return [content] if content else []
240
241 self._buffer.write(content)
242 if self._buffer.tell() >= self._chunk_size:
243 value = self._buffer.getvalue()
244 chunks = [
245 value[i : i + self._chunk_size]
246 for i in range(0, len(value), self._chunk_size)
247 ]
248 if len(chunks[-1]) == self._chunk_size:
249 self._buffer.seek(0)
250 self._buffer.truncate()
251 return chunks
252 else:
253 self._buffer.seek(0)
254 self._buffer.write(chunks[-1])
255 self._buffer.truncate()
256 return chunks[:-1]
257 else:
258 return []
259
260 def flush(self) -> list[bytes]:
261 value = self._buffer.getvalue()

Callers 4

iter_bytesMethod · 0.95
iter_rawMethod · 0.95
aiter_bytesMethod · 0.95
aiter_rawMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected