(
self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
)
| 11 | """RESP2 protocol implementation""" |
| 12 | |
| 13 | def read_response( |
| 14 | self, disable_decoding=False, timeout: Union[float, object] = SENTINEL |
| 15 | ): |
| 16 | pos = self._buffer.get_pos() if self._buffer else None |
| 17 | try: |
| 18 | result = self._read_response( |
| 19 | disable_decoding=disable_decoding, timeout=timeout |
| 20 | ) |
| 21 | except BaseException: |
| 22 | if self._buffer: |
| 23 | self._buffer.rewind(pos) |
| 24 | raise |
| 25 | else: |
| 26 | self._buffer.purge() |
| 27 | return result |
| 28 | |
| 29 | def _read_response( |
| 30 | self, disable_decoding=False, timeout: Union[float, object] = SENTINEL |
no test coverage detected