Poll the socket to see if there's data that can be read.
(self, timeout: float = 0)
| 1339 | ) |
| 1340 | |
| 1341 | def can_read(self, timeout: float = 0) -> bool: |
| 1342 | """Poll the socket to see if there's data that can be read.""" |
| 1343 | # TODO: Rename this API; it detects pending data or dirty/closed |
| 1344 | # connection state, not only whether application data can be read. |
| 1345 | sock = self._sock |
| 1346 | if not sock: |
| 1347 | self.connect() |
| 1348 | |
| 1349 | host_error = self._host_error() |
| 1350 | |
| 1351 | try: |
| 1352 | return self._parser.can_read(timeout) |
| 1353 | |
| 1354 | except OSError as e: |
| 1355 | self.disconnect() |
| 1356 | raise ConnectionError(f"Error while reading from {host_error}: {e.args}") |
| 1357 | |
| 1358 | def read_response( |
| 1359 | self, |
no test coverage detected