(self, amt: int = -1, till_crlf: bool = False)
| 2041 | return chunk |
| 2042 | |
| 2043 | def pop_current_chunk(self, amt: int = -1, till_crlf: bool = False) -> bytes: |
| 2044 | if amt > 0 and till_crlf: |
| 2045 | raise ValueError("Can't specify amt and till_crlf.") |
| 2046 | if len(self.cur_chunk) <= 0: |
| 2047 | self.cur_chunk = self._pop_new_chunk() |
| 2048 | if till_crlf: |
| 2049 | try: |
| 2050 | i = self.cur_chunk.index(b"\r\n") |
| 2051 | except ValueError: |
| 2052 | # No CRLF in current chunk -- probably caused by encoder. |
| 2053 | self.cur_chunk = b"" |
| 2054 | return b"" |
| 2055 | else: |
| 2056 | chunk_part = self.cur_chunk[: i + 2] |
| 2057 | self.cur_chunk = self.cur_chunk[i + 2 :] |
| 2058 | return chunk_part |
| 2059 | elif amt <= -1: |
| 2060 | chunk_part = self.cur_chunk |
| 2061 | self.cur_chunk = b"" |
| 2062 | return chunk_part |
| 2063 | else: |
| 2064 | try: |
| 2065 | chunk_part = self.cur_chunk[:amt] |
| 2066 | except IndexError: |
| 2067 | chunk_part = self.cur_chunk |
| 2068 | self.cur_chunk = b"" |
| 2069 | else: |
| 2070 | self.cur_chunk = self.cur_chunk[amt:] |
| 2071 | return chunk_part |
| 2072 | |
| 2073 | def readline(self, amt: int = -1) -> bytes: |
| 2074 | return self.pop_current_chunk(amt, till_crlf=amt < 0) |
no test coverage detected