MCPcopy
hub / github.com/urllib3/urllib3 / test_correct_header_line

Method test_correct_header_line

test/contrib/test_socks.py:314–347  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

312 assert response.headers["Server"] == "SocksTestServer"
313
314 def test_correct_header_line(self) -> None:
315 def request_handler(listener: socket.socket) -> None:
316 sock = listener.accept()[0]
317
318 handler = handle_socks5_negotiation(sock, negotiate=False)
319 addr, port = next(handler)
320
321 assert addr == b"example.com"
322 assert port == 80
323 with pytest.raises(StopIteration):
324 handler.send(True)
325
326 buf = b""
327 while True:
328 buf += sock.recv(65535)
329 if buf.endswith(b"\r\n\r\n"):
330 break
331
332 assert buf.startswith(b"GET / HTTP/1.1")
333 assert b"Host: example.com" in buf
334
335 sock.sendall(
336 b"HTTP/1.1 200 OK\r\n"
337 b"Server: SocksTestServer\r\n"
338 b"Content-Length: 0\r\n"
339 b"\r\n"
340 )
341 sock.close()
342
343 self._start_server(request_handler)
344 proxy_url = f"socks5h://{self.host}:{self.port}"
345 with socks.SOCKSProxyManager(proxy_url) as pm:
346 response = pm.request("GET", "http://example.com")
347 assert response.status == 200
348
349 def test_connection_timeouts(self) -> None:
350 event = threading.Event()

Callers

nothing calls this directly

Calls 2

_start_serverMethod · 0.45
requestMethod · 0.45

Tested by

no test coverage detected