(sock)
| 206 | auth = requests.auth.HTTPDigestAuth(&class="cm">#x27;userclass="st">', 'pass') |
| 207 | |
| 208 | def digest_failed_response_handler(sock): |
| 209 | class="cm"># Respond to initial GET with a challenge. |
| 210 | request_content = consume_socket_content(sock, timeout=0.5) |
| 211 | assert request_content.startswith(bclass="st">"GET / HTTP/1.1") |
| 212 | sock.send(text_401) |
| 213 | |
| 214 | class="cm"># Verify we receive an Authorization header in response, then |
| 215 | class="cm"># challenge again. |
| 216 | request_content = consume_socket_content(sock, timeout=0.5) |
| 217 | assert expected_digest in request_content |
| 218 | sock.send(text_401) |
| 219 | |
| 220 | class="cm"># Verify the client didn't respond to second challenge. |
| 221 | request_content = consume_socket_content(sock, timeout=0.5) |
| 222 | assert request_content == b&class="cm">#x27;' |
| 223 | |
| 224 | return request_content |
| 225 | |
| 226 | close_server = threading.Event() |
| 227 | server = Server(digest_failed_response_handler, wait_to_close_event=close_server) |
nothing calls this directly
no test coverage detected