MCPcopy
hub / github.com/encode/httpx / decode

Method decode

httpx/_decoders.py:333–369  ·  view source on GitHub ↗
(self, text: str)

Source from the content-addressed store, hash-verified

331 self.trailing_cr: bool = False
332
333 def decode(self, text: str) -> list[str]:
334 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
335 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
336
337 # We always push a trailing `\r` into the next decode iteration.
338 if self.trailing_cr:
339 text = "\r" + text
340 self.trailing_cr = False
341 if text.endswith("\r"):
342 self.trailing_cr = True
343 text = text[:-1]
344
345 if not text:
346 # NOTE: the edge case input of empty text doesn't occur in practice,
347 # because other httpx internals filter out this value
348 return [] # pragma: no cover
349
350 trailing_newline = text[-1] in NEWLINE_CHARS
351 lines = text.splitlines()
352
353 if len(lines) == 1 and not trailing_newline:
354 # No new lines, buffer the input and continue.
355 self.buffer.append(lines[0])
356 return []
357
358 if self.buffer:
359 # Include any existing buffer in the first portion of the
360 # splitlines result.
361 lines = ["".join(self.buffer) + lines[0]] + lines[1:]
362 self.buffer = []
363
364 if not trailing_newline:
365 # If the last segment of splitlines is not newline terminated,
366 # then drop it from our output and start a new buffer.
367 self.buffer = [lines.pop()]
368
369 return lines
370
371 def flush(self) -> list[str]:
372 if not self.buffer and not self.trailing_cr:

Callers 2

iter_linesMethod · 0.95
aiter_linesMethod · 0.95

Calls 1

joinMethod · 0.80

Tested by

no test coverage detected