Test successful connection.
(self)
| 45 | assert "Failed to connect" in str(exc_info.value) |
| 46 | |
| 47 | def test_connect_success(self): |
| 48 | """Test successful connection.""" |
| 49 | with tempfile.TemporaryDirectory() as tmpdir: |
| 50 | socket_path = os.path.join(tmpdir, "test.sock") |
| 51 | |
| 52 | # Create a listening socket |
| 53 | server_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
| 54 | server_sock.bind(socket_path) |
| 55 | server_sock.listen(1) |
| 56 | |
| 57 | try: |
| 58 | client = ControlClient(socket_path) |
| 59 | client.connect() |
| 60 | |
| 61 | assert client._sock is not None |
| 62 | client.close() |
| 63 | finally: |
| 64 | server_sock.close() |
| 65 | |
| 66 | def test_connect_already_connected(self): |
| 67 | """Test that connect is idempotent.""" |
nothing calls this directly
no test coverage detected