Handle the SOCKS5 handshake. Returns a generator object that allows us to break the handshake into steps so that the test code can intervene at certain useful points.
(
sock: socket.socket,
negotiate: bool,
username: bytes | None = None,
password: bytes | None = None,
)
| 123 | |
| 124 | |
| 125 | def handle_socks5_negotiation( |
| 126 | sock: socket.socket, |
| 127 | negotiate: bool, |
| 128 | username: bytes | None = None, |
| 129 | password: bytes | None = None, |
| 130 | ) -> typing.Generator[tuple[bytes | str, int], bool, None]: |
| 131 | """ |
| 132 | Handle the SOCKS5 handshake. |
| 133 | |
| 134 | Returns a generator object that allows us to break the handshake into |
| 135 | steps so that the test code can intervene at certain useful points. |
| 136 | """ |
| 137 | received_version = sock.recv(1) |
| 138 | assert received_version == SOCKS_VERSION_SOCKS5 |
| 139 | nmethods = ord(sock.recv(1)) |
| 140 | methods = _read_exactly(sock, nmethods) |
| 141 | |
| 142 | if negotiate: |
| 143 | assert SOCKS_NEGOTIATION_PASSWORD in methods |
| 144 | send_data = SOCKS_VERSION_SOCKS5 + SOCKS_NEGOTIATION_PASSWORD |
| 145 | sock.sendall(send_data) |
| 146 | |
| 147 | # This is the password negotiation. |
| 148 | negotiation_version = sock.recv(1) |
| 149 | assert negotiation_version == b"\x01" |
| 150 | ulen = ord(sock.recv(1)) |
| 151 | provided_username = _read_exactly(sock, ulen) |
| 152 | plen = ord(sock.recv(1)) |
| 153 | provided_password = _read_exactly(sock, plen) |
| 154 | |
| 155 | if username == provided_username and password == provided_password: |
| 156 | sock.sendall(b"\x01\x00") |
| 157 | else: |
| 158 | sock.sendall(b"\x01\x01") |
| 159 | sock.close() |
| 160 | return |
| 161 | else: |
| 162 | assert SOCKS_NEGOTIATION_NONE in methods |
| 163 | send_data = SOCKS_VERSION_SOCKS5 + SOCKS_NEGOTIATION_NONE |
| 164 | sock.sendall(send_data) |
| 165 | |
| 166 | # Client sends where they want to go. |
| 167 | received_version = sock.recv(1) |
| 168 | command = sock.recv(1) |
| 169 | reserved = sock.recv(1) |
| 170 | addr = _address_from_socket(sock) |
| 171 | port_raw = _read_exactly(sock, 2) |
| 172 | port = (ord(port_raw[0:1]) << 8) + (ord(port_raw[1:2])) |
| 173 | |
| 174 | # Check some basic stuff. |
| 175 | assert received_version == SOCKS_VERSION_SOCKS5 |
| 176 | assert command == b"\x01" # Only support connect, not bind. |
| 177 | assert reserved == b"\x00" |
| 178 | |
| 179 | # Yield the address port tuple. |
| 180 | succeed = yield addr, port |
| 181 | |
| 182 | if succeed: |
no test coverage detected