| 497 | return self._ctx.set_alpn_protos(protocols) # type: ignore[no-any-return] |
| 498 | |
| 499 | def wrap_socket( |
| 500 | self, |
| 501 | sock: socket_cls, |
| 502 | server_side: bool = False, |
| 503 | do_handshake_on_connect: bool = True, |
| 504 | suppress_ragged_eofs: bool = True, |
| 505 | server_hostname: bytes | str | None = None, |
| 506 | ) -> WrappedSocket: |
| 507 | cnx = OpenSSL.SSL.Connection(self._ctx, sock) |
| 508 | |
| 509 | # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 |
| 510 | if server_hostname and not util.ssl_.is_ipaddress(server_hostname): |
| 511 | if isinstance(server_hostname, str): |
| 512 | server_hostname = server_hostname.encode("utf-8") |
| 513 | cnx.set_tlsext_host_name(server_hostname) |
| 514 | |
| 515 | cnx.set_connect_state() |
| 516 | |
| 517 | while True: |
| 518 | try: |
| 519 | cnx.do_handshake() |
| 520 | except OpenSSL.SSL.WantReadError as e: |
| 521 | if not util.wait_for_read(sock, sock.gettimeout()): |
| 522 | raise TimeoutError("select timed out") from e |
| 523 | continue |
| 524 | except OpenSSL.SSL.Error as e: |
| 525 | raise ssl.SSLError(f"bad handshake: {e!r}") from e |
| 526 | break |
| 527 | |
| 528 | return WrappedSocket(cnx, sock) |
| 529 | |
| 530 | def _set_ctx_options(self) -> None: |
| 531 | self._ctx.set_options( |