MCPcopy
hub / github.com/urllib3/urllib3 / handle_socks5_negotiation

Function handle_socks5_negotiation

test/contrib/test_socks.py:125–189  ·  view source on GitHub ↗

Handle the SOCKS5 handshake. Returns a generator object that allows us to break the handshake into steps so that the test code can intervene at certain useful points.

(
    sock: socket.socket,
    negotiate: bool,
    username: bytes | None = None,
    password: bytes | None = None,
)

Source from the content-addressed store, hash-verified

123
124
125def handle_socks5_negotiation(
126 sock: socket.socket,
127 negotiate: bool,
128 username: bytes | None = None,
129 password: bytes | None = None,
130) -> typing.Generator[tuple[bytes | str, int], bool, None]:
131 """
132 Handle the SOCKS5 handshake.
133
134 Returns a generator object that allows us to break the handshake into
135 steps so that the test code can intervene at certain useful points.
136 """
137 received_version = sock.recv(1)
138 assert received_version == SOCKS_VERSION_SOCKS5
139 nmethods = ord(sock.recv(1))
140 methods = _read_exactly(sock, nmethods)
141
142 if negotiate:
143 assert SOCKS_NEGOTIATION_PASSWORD in methods
144 send_data = SOCKS_VERSION_SOCKS5 + SOCKS_NEGOTIATION_PASSWORD
145 sock.sendall(send_data)
146
147 # This is the password negotiation.
148 negotiation_version = sock.recv(1)
149 assert negotiation_version == b"\x01"
150 ulen = ord(sock.recv(1))
151 provided_username = _read_exactly(sock, ulen)
152 plen = ord(sock.recv(1))
153 provided_password = _read_exactly(sock, plen)
154
155 if username == provided_username and password == provided_password:
156 sock.sendall(b"\x01\x00")
157 else:
158 sock.sendall(b"\x01\x01")
159 sock.close()
160 return
161 else:
162 assert SOCKS_NEGOTIATION_NONE in methods
163 send_data = SOCKS_VERSION_SOCKS5 + SOCKS_NEGOTIATION_NONE
164 sock.sendall(send_data)
165
166 # Client sends where they want to go.
167 received_version = sock.recv(1)
168 command = sock.recv(1)
169 reserved = sock.recv(1)
170 addr = _address_from_socket(sock)
171 port_raw = _read_exactly(sock, 2)
172 port = (ord(port_raw[0:1]) << 8) + (ord(port_raw[1:2]))
173
174 # Check some basic stuff.
175 assert received_version == SOCKS_VERSION_SOCKS5
176 assert command == b"\x01" # Only support connect, not bind.
177 assert reserved == b"\x00"
178
179 # Yield the address port tuple.
180 succeed = yield addr, port
181
182 if succeed:

Callers 2

request_handlerMethod · 0.85
request_handlerMethod · 0.85

Calls 5

_read_exactlyFunction · 0.85
_address_from_socketFunction · 0.85
recvMethod · 0.45
sendallMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected