Test starting and stopping the server.
(self)
| 113 | """Tests for server start/stop.""" |
| 114 | |
| 115 | def test_start_stop(self): |
| 116 | """Test starting and stopping the server.""" |
| 117 | with tempfile.TemporaryDirectory() as tmpdir: |
| 118 | socket_path = os.path.join(tmpdir, "test.sock") |
| 119 | |
| 120 | arbiter = MockArbiter() |
| 121 | server = ControlSocketServer(arbiter, socket_path) |
| 122 | |
| 123 | server.start() |
| 124 | |
| 125 | # Wait for server to start |
| 126 | for _ in range(50): |
| 127 | if os.path.exists(socket_path): |
| 128 | break |
| 129 | time.sleep(0.1) |
| 130 | time.sleep(0.2) # Extra wait for server to be fully ready |
| 131 | |
| 132 | assert os.path.exists(socket_path) |
| 133 | |
| 134 | server.stop() |
| 135 | |
| 136 | # Wait for cleanup |
| 137 | time.sleep(0.2) |
| 138 | |
| 139 | # Socket should be cleaned up |
| 140 | assert not os.path.exists(socket_path) |
| 141 | |
| 142 | def test_start_already_running(self): |
| 143 | """Test that start is idempotent.""" |
nothing calls this directly
no test coverage detected