MCPcopy
hub / github.com/urllib3/urllib3 / handle_socks4_negotiation

Function handle_socks4_negotiation

test/contrib/test_socks.py:192–232  ·  view source on GitHub ↗

Handle the SOCKS4 handshake. Returns a generator object that allows us to break the handshake into steps so that the test code can intervene at certain useful points.

(
    sock: socket.socket, username: bytes | None = None
)

Source from the content-addressed store, hash-verified

190
191
192def handle_socks4_negotiation(
193 sock: socket.socket, username: bytes | None = None
194) -> typing.Generator[tuple[bytes | str, int], bool, None]:
195 """
196 Handle the SOCKS4 handshake.
197
198 Returns a generator object that allows us to break the handshake into
199 steps so that the test code can intervene at certain useful points.
200 """
201 received_version = sock.recv(1)
202 command = sock.recv(1)
203 port_raw = _read_exactly(sock, 2)
204 port = (ord(port_raw[0:1]) << 8) + (ord(port_raw[1:2]))
205 addr_raw = _read_exactly(sock, 4)
206 provided_username = _read_until(sock, b"\x00")[:-1] # Strip trailing null.
207
208 addr: bytes | str
209 if addr_raw == b"\x00\x00\x00\x01":
210 # Magic string: means DNS name.
211 addr = _read_until(sock, b"\x00")[:-1] # Strip trailing null.
212 else:
213 addr = socket.inet_ntoa(addr_raw)
214
215 # Check some basic stuff.
216 assert received_version == SOCKS_VERSION_SOCKS4
217 assert command == b"\x01" # Only support connect, not bind.
218
219 if username is not None and username != provided_username:
220 sock.sendall(b"\x00\x5d\x00\x00\x00\x00\x00\x00")
221 sock.close()
222 return
223
224 # Yield the address port tuple.
225 succeed = yield addr, port
226
227 if succeed:
228 response = b"\x00\x5a\xea\x60\x7f\x00\x00\x01"
229 else:
230 response = b"\x00\x5b\x00\x00\x00\x00\x00\x00"
231
232 sock.sendall(response)
233
234
235class TestSOCKSProxyManager:

Callers 1

request_handlerMethod · 0.85

Calls 5

_read_exactlyFunction · 0.85
_read_untilFunction · 0.85
recvMethod · 0.45
sendallMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected