Handle the SOCKS4 handshake. Returns a generator object that allows us to break the handshake into steps so that the test code can intervene at certain useful points.
(
sock: socket.socket, username: bytes | None = None
)
| 190 | |
| 191 | |
| 192 | def handle_socks4_negotiation( |
| 193 | sock: socket.socket, username: bytes | None = None |
| 194 | ) -> typing.Generator[tuple[bytes | str, int], bool, None]: |
| 195 | """ |
| 196 | Handle the SOCKS4 handshake. |
| 197 | |
| 198 | Returns a generator object that allows us to break the handshake into |
| 199 | steps so that the test code can intervene at certain useful points. |
| 200 | """ |
| 201 | received_version = sock.recv(1) |
| 202 | command = sock.recv(1) |
| 203 | port_raw = _read_exactly(sock, 2) |
| 204 | port = (ord(port_raw[0:1]) << 8) + (ord(port_raw[1:2])) |
| 205 | addr_raw = _read_exactly(sock, 4) |
| 206 | provided_username = _read_until(sock, b"\x00")[:-1] # Strip trailing null. |
| 207 | |
| 208 | addr: bytes | str |
| 209 | if addr_raw == b"\x00\x00\x00\x01": |
| 210 | # Magic string: means DNS name. |
| 211 | addr = _read_until(sock, b"\x00")[:-1] # Strip trailing null. |
| 212 | else: |
| 213 | addr = socket.inet_ntoa(addr_raw) |
| 214 | |
| 215 | # Check some basic stuff. |
| 216 | assert received_version == SOCKS_VERSION_SOCKS4 |
| 217 | assert command == b"\x01" # Only support connect, not bind. |
| 218 | |
| 219 | if username is not None and username != provided_username: |
| 220 | sock.sendall(b"\x00\x5d\x00\x00\x00\x00\x00\x00") |
| 221 | sock.close() |
| 222 | return |
| 223 | |
| 224 | # Yield the address port tuple. |
| 225 | succeed = yield addr, port |
| 226 | |
| 227 | if succeed: |
| 228 | response = b"\x00\x5a\xea\x60\x7f\x00\x00\x01" |
| 229 | else: |
| 230 | response = b"\x00\x5b\x00\x00\x00\x00\x00\x00" |
| 231 | |
| 232 | sock.sendall(response) |
| 233 | |
| 234 | |
| 235 | class TestSOCKSProxyManager: |
no test coverage detected