(self)
| 5587 | # The important thing is that our CI has the logic covered. |
| 5588 | |
| 5589 | def test_preauth_data_to_tls_server(self): |
| 5590 | server_accept_called = threading.Event() |
| 5591 | ready_for_server_wrap_socket = threading.Event() |
| 5592 | |
| 5593 | def call_after_accept(unused): |
| 5594 | server_accept_called.set() |
| 5595 | if not ready_for_server_wrap_socket.wait(support.SHORT_TIMEOUT): |
| 5596 | raise RuntimeError("wrap_socket event never set, test may fail.") |
| 5597 | return False # Tell the server thread to continue. |
| 5598 | |
| 5599 | server = self.SingleConnectionTestServerThread( |
| 5600 | call_after_accept=call_after_accept, |
| 5601 | name="preauth_data_to_tls_server") |
| 5602 | self.enterContext(server) # starts it & unittest.TestCase stops it. |
| 5603 | |
| 5604 | with socket.socket() as client: |
| 5605 | client.connect(server.listener.getsockname()) |
| 5606 | # This forces an immediate connection close via RST on .close(). |
| 5607 | set_socket_so_linger_on_with_zero_timeout(client) |
| 5608 | client.setblocking(False) |
| 5609 | |
| 5610 | server_accept_called.wait() |
| 5611 | client.send(b"DELETE /data HTTP/1.0\r\n\r\n") |
| 5612 | client.close() # RST |
| 5613 | |
| 5614 | ready_for_server_wrap_socket.set() |
| 5615 | server.join() |
| 5616 | |
| 5617 | wrap_error = server.wrap_error |
| 5618 | server.wrap_error = None |
| 5619 | try: |
| 5620 | self.assertEqual(b"", server.received_data) |
| 5621 | self.assertIsInstance(wrap_error, OSError) # All platforms. |
| 5622 | self.non_linux_skip_if_other_okay_error(wrap_error) |
| 5623 | self.assertIsInstance(wrap_error, ssl.SSLError) |
| 5624 | self.assertIn("before TLS handshake with data", wrap_error.args[1]) |
| 5625 | self.assertIn("before TLS handshake with data", wrap_error.reason) |
| 5626 | self.assertNotEqual(0, wrap_error.args[0]) |
| 5627 | self.assertIsNone(wrap_error.library, msg="attr must exist") |
| 5628 | finally: |
| 5629 | # gh-108342: Explicitly break the reference cycle |
| 5630 | wrap_error = None |
| 5631 | server = None |
| 5632 | |
| 5633 | def test_preauth_data_to_tls_client(self): |
| 5634 | server_can_continue_with_wrap_socket = threading.Event() |
nothing calls this directly
no test coverage detected