(self)
| 406 | evt.set() |
| 407 | |
| 408 | def test_socks_with_password(self) -> None: |
| 409 | def request_handler(listener: socket.socket) -> None: |
| 410 | sock = listener.accept()[0] |
| 411 | |
| 412 | handler = handle_socks5_negotiation( |
| 413 | sock, negotiate=True, username=b"user", password=b"pass" |
| 414 | ) |
| 415 | addr, port = next(handler) |
| 416 | |
| 417 | assert addr == "16.17.18.19" |
| 418 | assert port == 80 |
| 419 | with pytest.raises(StopIteration): |
| 420 | handler.send(True) |
| 421 | |
| 422 | while True: |
| 423 | buf = sock.recv(65535) |
| 424 | if buf.endswith(b"\r\n\r\n"): |
| 425 | break |
| 426 | |
| 427 | sock.sendall( |
| 428 | b"HTTP/1.1 200 OK\r\n" |
| 429 | b"Server: SocksTestServer\r\n" |
| 430 | b"Content-Length: 0\r\n" |
| 431 | b"\r\n" |
| 432 | ) |
| 433 | sock.close() |
| 434 | |
| 435 | self._start_server(request_handler) |
| 436 | proxy_url = f"socks5://{self.host}:{self.port}" |
| 437 | with socks.SOCKSProxyManager(proxy_url, username="user", password="pass") as pm: |
| 438 | response = pm.request("GET", "http://16.17.18.19") |
| 439 | |
| 440 | assert response.status == 200 |
| 441 | assert response.data == b"" |
| 442 | assert response.headers["Server"] == "SocksTestServer" |
| 443 | |
| 444 | def test_socks_with_auth_in_url(self) -> None: |
| 445 | """ |
nothing calls this directly
no test coverage detected