Try to convert an ``ssl_options`` dictionary to an `~ssl.SSLContext` object. The ``ssl_options`` argument may be either an `ssl.SSLContext` object or a dictionary containing keywords to be passed to ``ssl.SSLContext.wrap_socket``. This function converts the dict form to its `~ssl.S
(
ssl_options: Union[Dict[str, Any], ssl.SSLContext],
server_side: Optional[bool] = None,
)
| 588 | |
| 589 | |
| 590 | def ssl_options_to_context( |
| 591 | ssl_options: Union[Dict[str, Any], ssl.SSLContext], |
| 592 | server_side: Optional[bool] = None, |
| 593 | ) -> ssl.SSLContext: |
| 594 | """Try to convert an ``ssl_options`` dictionary to an |
| 595 | `~ssl.SSLContext` object. |
| 596 | |
| 597 | The ``ssl_options`` argument may be either an `ssl.SSLContext` object or a dictionary containing |
| 598 | keywords to be passed to ``ssl.SSLContext.wrap_socket``. This function converts the dict form |
| 599 | to its `~ssl.SSLContext` equivalent, and may be used when a component which accepts both forms |
| 600 | needs to upgrade to the `~ssl.SSLContext` version to use features like SNI or ALPN. |
| 601 | |
| 602 | .. versionchanged:: 6.2 |
| 603 | |
| 604 | Added server_side argument. Omitting this argument will result in a DeprecationWarning on |
| 605 | Python 3.10. |
| 606 | |
| 607 | """ |
| 608 | if isinstance(ssl_options, ssl.SSLContext): |
| 609 | return ssl_options |
| 610 | assert isinstance(ssl_options, dict) |
| 611 | assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options |
| 612 | # TODO: Now that we have the server_side argument, can we switch to |
| 613 | # create_default_context or would that change behavior? |
| 614 | default_version = ssl.PROTOCOL_TLS |
| 615 | if server_side: |
| 616 | default_version = ssl.PROTOCOL_TLS_SERVER |
| 617 | elif server_side is not None: |
| 618 | default_version = ssl.PROTOCOL_TLS_CLIENT |
| 619 | context = ssl.SSLContext(ssl_options.get("ssl_version", default_version)) |
| 620 | if "certfile" in ssl_options: |
| 621 | context.load_cert_chain( |
| 622 | ssl_options["certfile"], ssl_options.get("keyfile", None) |
| 623 | ) |
| 624 | if "cert_reqs" in ssl_options: |
| 625 | if ssl_options["cert_reqs"] == ssl.CERT_NONE: |
| 626 | # This may have been set automatically by PROTOCOL_TLS_CLIENT but is |
| 627 | # incompatible with CERT_NONE so we must manually clear it. |
| 628 | context.check_hostname = False |
| 629 | context.verify_mode = ssl_options["cert_reqs"] |
| 630 | if "ca_certs" in ssl_options: |
| 631 | context.load_verify_locations(ssl_options["ca_certs"]) |
| 632 | if "ciphers" in ssl_options: |
| 633 | context.set_ciphers(ssl_options["ciphers"]) |
| 634 | if hasattr(ssl, "OP_NO_COMPRESSION"): |
| 635 | # Disable TLS compression to avoid CRIME and related attacks. |
| 636 | # This constant depends on openssl version 1.0. |
| 637 | # TODO: Do we need to do this ourselves or can we trust |
| 638 | # the defaults? |
| 639 | context.options |= ssl.OP_NO_COMPRESSION |
| 640 | return context |
| 641 | |
| 642 | |
| 643 | def ssl_wrap_socket( |