Connect to control socket. Raises: ControlClientError: If connection fails
(self)
| 45 | self._request_id = 0 |
| 46 | |
| 47 | def connect(self): |
| 48 | """ |
| 49 | Connect to control socket. |
| 50 | |
| 51 | Raises: |
| 52 | ControlClientError: If connection fails |
| 53 | """ |
| 54 | if self._sock: |
| 55 | return |
| 56 | |
| 57 | try: |
| 58 | self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 59 | self._sock.settimeout(self.timeout) |
| 60 | self._sock.connect(self.socket_path) |
| 61 | except socket.error as e: |
| 62 | self._sock = None |
| 63 | raise ControlClientError(f"Failed to connect to {self.socket_path}: {e}") |
| 64 | |
| 65 | def close(self): |
| 66 | """Close connection.""" |