(self)
| 1336 | return hasattr(self._fp, "fp") |
| 1337 | |
| 1338 | def _update_chunk_length(self) -> None: |
| 1339 | # First, we'll figure out length of a chunk and then |
| 1340 | # we'll try to read it from socket. |
| 1341 | if self.chunk_left is not None: |
| 1342 | return None |
| 1343 | line = self._fp.fp.readline() # type: ignore[union-attr] |
| 1344 | line = line.split(b";", 1)[0] |
| 1345 | try: |
| 1346 | self.chunk_left = int(line, 16) |
| 1347 | except ValueError: |
| 1348 | self.close() |
| 1349 | if line: |
| 1350 | # Invalid chunked protocol response, abort. |
| 1351 | raise InvalidChunkLength(self, line) from None |
| 1352 | else: |
| 1353 | # Truncated at start of next chunk |
| 1354 | raise ProtocolError("Response ended prematurely") from None |
| 1355 | |
| 1356 | def _handle_chunk(self, amt: int | None) -> bytes: |
| 1357 | returned_chunk = None |
no test coverage detected