(self, text: str)
| 331 | self.trailing_cr: bool = False |
| 332 | |
| 333 | def decode(self, text: str) -> list[str]: |
| 334 | # See https://docs.python.org/3/library/stdtypes.html#str.splitlines |
| 335 | NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029" |
| 336 | |
| 337 | # We always push a trailing `\r` into the next decode iteration. |
| 338 | if self.trailing_cr: |
| 339 | text = "\r" + text |
| 340 | self.trailing_cr = False |
| 341 | if text.endswith("\r"): |
| 342 | self.trailing_cr = True |
| 343 | text = text[:-1] |
| 344 | |
| 345 | if not text: |
| 346 | # NOTE: the edge case input of empty text doesn't occur in practice, |
| 347 | # because other httpx internals filter out this value |
| 348 | return [] # pragma: no cover |
| 349 | |
| 350 | trailing_newline = text[-1] in NEWLINE_CHARS |
| 351 | lines = text.splitlines() |
| 352 | |
| 353 | if len(lines) == 1 and not trailing_newline: |
| 354 | # No new lines, buffer the input and continue. |
| 355 | self.buffer.append(lines[0]) |
| 356 | return [] |
| 357 | |
| 358 | if self.buffer: |
| 359 | # Include any existing buffer in the first portion of the |
| 360 | # splitlines result. |
| 361 | lines = ["".join(self.buffer) + lines[0]] + lines[1:] |
| 362 | self.buffer = [] |
| 363 | |
| 364 | if not trailing_newline: |
| 365 | # If the last segment of splitlines is not newline terminated, |
| 366 | # then drop it from our output and start a new buffer. |
| 367 | self.buffer = [lines.pop()] |
| 368 | |
| 369 | return lines |
| 370 | |
| 371 | def flush(self) -> list[str]: |
| 372 | if not self.buffer and not self.trailing_cr: |
no test coverage detected