Check the health of the connection with a PING/PONG
(self)
| 656 | ) |
| 657 | |
| 658 | async def check_health(self): |
| 659 | """Check the health of the connection with a PING/PONG""" |
| 660 | if ( |
| 661 | self.health_check_interval |
| 662 | and asyncio.get_running_loop().time() > self.next_health_check |
| 663 | ): |
| 664 | await self.retry.call_with_retry( |
| 665 | self._send_ping, self._ping_failed, with_failure_count=True |
| 666 | ) |
| 667 | |
| 668 | async def _send_packed_command(self, command: Iterable[bytes]) -> None: |
| 669 | self._writer.writelines(command) |
no test coverage detected