(self)
| 312 | assert response.headers["Server"] == "SocksTestServer" |
| 313 | |
| 314 | def test_correct_header_line(self) -> None: |
| 315 | def request_handler(listener: socket.socket) -> None: |
| 316 | sock = listener.accept()[0] |
| 317 | |
| 318 | handler = handle_socks5_negotiation(sock, negotiate=False) |
| 319 | addr, port = next(handler) |
| 320 | |
| 321 | assert addr == b"example.com" |
| 322 | assert port == 80 |
| 323 | with pytest.raises(StopIteration): |
| 324 | handler.send(True) |
| 325 | |
| 326 | buf = b"" |
| 327 | while True: |
| 328 | buf += sock.recv(65535) |
| 329 | if buf.endswith(b"\r\n\r\n"): |
| 330 | break |
| 331 | |
| 332 | assert buf.startswith(b"GET / HTTP/1.1") |
| 333 | assert b"Host: example.com" in buf |
| 334 | |
| 335 | sock.sendall( |
| 336 | b"HTTP/1.1 200 OK\r\n" |
| 337 | b"Server: SocksTestServer\r\n" |
| 338 | b"Content-Length: 0\r\n" |
| 339 | b"\r\n" |
| 340 | ) |
| 341 | sock.close() |
| 342 | |
| 343 | self._start_server(request_handler) |
| 344 | proxy_url = f"socks5h://{self.host}:{self.port}" |
| 345 | with socks.SOCKSProxyManager(proxy_url) as pm: |
| 346 | response = pm.request("GET", "http://example.com") |
| 347 | assert response.status == 200 |
| 348 | |
| 349 | def test_connection_timeouts(self) -> None: |
| 350 | event = threading.Event() |
nothing calls this directly
no test coverage detected