Establish connection to arbiter.
(self)
| 112 | return str(uuid.uuid4()) |
| 113 | |
| 114 | def _connect(self): |
| 115 | """Establish connection to arbiter.""" |
| 116 | import socket |
| 117 | if self._sock is not None: |
| 118 | return |
| 119 | |
| 120 | try: |
| 121 | self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 122 | self._sock.settimeout(self.timeout) |
| 123 | self._sock.connect(self.socket_path) |
| 124 | except (socket.error, OSError) as e: |
| 125 | self._sock = None |
| 126 | raise StashError(f"Failed to connect to arbiter: {e}") from e |
| 127 | |
| 128 | def _close(self): |
| 129 | """Close the connection.""" |
no test coverage detected