(self)
| 1132 | ) |
| 1133 | |
| 1134 | def test_headers(self) -> None: |
| 1135 | def echo_socket_handler(listener: socket.socket) -> None: |
| 1136 | sock = listener.accept()[0] |
| 1137 | |
| 1138 | buf = b"" |
| 1139 | while not buf.endswith(b"\r\n\r\n"): |
| 1140 | buf += sock.recv(65536) |
| 1141 | |
| 1142 | sock.send( |
| 1143 | ( |
| 1144 | "HTTP/1.1 200 OK\r\n" |
| 1145 | "Content-Type: text/plain\r\n" |
| 1146 | "Content-Length: %d\r\n" |
| 1147 | "\r\n" |
| 1148 | "%s" % (len(buf), buf.decode("utf-8")) |
| 1149 | ).encode("utf-8") |
| 1150 | ) |
| 1151 | sock.close() |
| 1152 | |
| 1153 | self._start_server(echo_socket_handler) |
| 1154 | base_url = f"http://{self.host}:{self.port}" |
| 1155 | |
| 1156 | # Define some proxy headers. |
| 1157 | proxy_headers = HTTPHeaderDict({"For The Proxy": "YEAH!"}) |
| 1158 | with proxy_from_url(base_url, proxy_headers=proxy_headers) as proxy: |
| 1159 | conn = proxy.connection_from_url("http://www.google.com/") |
| 1160 | |
| 1161 | r = conn.urlopen("GET", "http://www.google.com/", assert_same_host=False) |
| 1162 | |
| 1163 | assert r.status == 200 |
| 1164 | # FIXME: The order of the headers is not predictable right now. We |
| 1165 | # should fix that someday (maybe when we migrate to |
| 1166 | # OrderedDict/MultiDict). |
| 1167 | assert b"For The Proxy: YEAH!\r\n" in r.data |
| 1168 | |
| 1169 | def test_retries(self) -> None: |
| 1170 | close_event = Event() |
nothing calls this directly
no test coverage detected