MCPcopy
hub / github.com/urllib3/urllib3 / pop_current_chunk

Method pop_current_chunk

test/test_response.py:2043–2071  ·  view source on GitHub ↗
(self, amt: int = -1, till_crlf: bool = False)

Source from the content-addressed store, hash-verified

2041 return chunk
2042
2043 def pop_current_chunk(self, amt: int = -1, till_crlf: bool = False) -> bytes:
2044 if amt > 0 and till_crlf:
2045 raise ValueError("Can't specify amt and till_crlf.")
2046 if len(self.cur_chunk) <= 0:
2047 self.cur_chunk = self._pop_new_chunk()
2048 if till_crlf:
2049 try:
2050 i = self.cur_chunk.index(b"\r\n")
2051 except ValueError:
2052 # No CRLF in current chunk -- probably caused by encoder.
2053 self.cur_chunk = b""
2054 return b""
2055 else:
2056 chunk_part = self.cur_chunk[: i + 2]
2057 self.cur_chunk = self.cur_chunk[i + 2 :]
2058 return chunk_part
2059 elif amt <= -1:
2060 chunk_part = self.cur_chunk
2061 self.cur_chunk = b""
2062 return chunk_part
2063 else:
2064 try:
2065 chunk_part = self.cur_chunk[:amt]
2066 except IndexError:
2067 chunk_part = self.cur_chunk
2068 self.cur_chunk = b""
2069 else:
2070 self.cur_chunk = self.cur_chunk[amt:]
2071 return chunk_part
2072
2073 def readline(self, amt: int = -1) -> bytes:
2074 return self.pop_current_chunk(amt, till_crlf=amt < 0)

Callers 3

readlineMethod · 0.95
readMethod · 0.95
read1Method · 0.95

Calls 1

_pop_new_chunkMethod · 0.95

Tested by

no test coverage detected