(self)
| 278 | assert response.headers["Server"] == "SocksTestServer" |
| 279 | |
| 280 | def test_local_dns(self) -> None: |
| 281 | def request_handler(listener: socket.socket) -> None: |
| 282 | sock = listener.accept()[0] |
| 283 | |
| 284 | handler = handle_socks5_negotiation(sock, negotiate=False) |
| 285 | addr, port = next(handler) |
| 286 | |
| 287 | assert addr in ["127.0.0.1", "::1"] |
| 288 | assert port == 80 |
| 289 | with pytest.raises(StopIteration): |
| 290 | handler.send(True) |
| 291 | |
| 292 | while True: |
| 293 | buf = sock.recv(65535) |
| 294 | if buf.endswith(b"\r\n\r\n"): |
| 295 | break |
| 296 | |
| 297 | sock.sendall( |
| 298 | b"HTTP/1.1 200 OK\r\n" |
| 299 | b"Server: SocksTestServer\r\n" |
| 300 | b"Content-Length: 0\r\n" |
| 301 | b"\r\n" |
| 302 | ) |
| 303 | sock.close() |
| 304 | |
| 305 | self._start_server(request_handler) |
| 306 | proxy_url = f"socks5://{self.host}:{self.port}" |
| 307 | with socks.SOCKSProxyManager(proxy_url) as pm: |
| 308 | response = pm.request("GET", "http://localhost") |
| 309 | |
| 310 | assert response.status == 200 |
| 311 | assert response.data == b"" |
| 312 | assert response.headers["Server"] == "SocksTestServer" |
| 313 | |
| 314 | def test_correct_header_line(self) -> None: |
| 315 | def request_handler(listener: socket.socket) -> None: |
nothing calls this directly
no test coverage detected