(
self,
verify: ssl.SSLContext | str | bool = True,
cert: CertTypes | None = None,
trust_env: bool = True,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
proxy: ProxyTypes | None = None,
uds: str | None = None,
local_address: str | None = None,
retries: int = 0,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
)
| 134 | |
| 135 | class HTTPTransport(BaseTransport): |
| 136 | def __init__( |
| 137 | self, |
| 138 | verify: ssl.SSLContext | str | bool = True, |
| 139 | cert: CertTypes | None = None, |
| 140 | trust_env: bool = True, |
| 141 | http1: bool = True, |
| 142 | http2: bool = False, |
| 143 | limits: Limits = DEFAULT_LIMITS, |
| 144 | proxy: ProxyTypes | None = None, |
| 145 | uds: str | None = None, |
| 146 | local_address: str | None = None, |
| 147 | retries: int = 0, |
| 148 | socket_options: typing.Iterable[SOCKET_OPTION] | None = None, |
| 149 | ) -> None: |
| 150 | import httpcore |
| 151 | |
| 152 | proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy |
| 153 | ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) |
| 154 | |
| 155 | if proxy is None: |
| 156 | self._pool = httpcore.ConnectionPool( |
| 157 | ssl_context=ssl_context, |
| 158 | max_connections=limits.max_connections, |
| 159 | max_keepalive_connections=limits.max_keepalive_connections, |
| 160 | keepalive_expiry=limits.keepalive_expiry, |
| 161 | http1=http1, |
| 162 | http2=http2, |
| 163 | uds=uds, |
| 164 | local_address=local_address, |
| 165 | retries=retries, |
| 166 | socket_options=socket_options, |
| 167 | ) |
| 168 | elif proxy.url.scheme in ("http", "https"): |
| 169 | self._pool = httpcore.HTTPProxy( |
| 170 | proxy_url=httpcore.URL( |
| 171 | scheme=proxy.url.raw_scheme, |
| 172 | host=proxy.url.raw_host, |
| 173 | port=proxy.url.port, |
| 174 | target=proxy.url.raw_path, |
| 175 | ), |
| 176 | proxy_auth=proxy.raw_auth, |
| 177 | proxy_headers=proxy.headers.raw, |
| 178 | ssl_context=ssl_context, |
| 179 | proxy_ssl_context=proxy.ssl_context, |
| 180 | max_connections=limits.max_connections, |
| 181 | max_keepalive_connections=limits.max_keepalive_connections, |
| 182 | keepalive_expiry=limits.keepalive_expiry, |
| 183 | http1=http1, |
| 184 | http2=http2, |
| 185 | socket_options=socket_options, |
| 186 | ) |
| 187 | elif proxy.url.scheme in ("socks5", "socks5h"): |
| 188 | try: |
| 189 | import socksio # noqa |
| 190 | except ImportError: # pragma: no cover |
| 191 | raise ImportError( |
| 192 | "Using SOCKS proxy, but the 'socksio' package is not installed. " |
| 193 | "Make sure to install httpx using `pip install httpx[socks]`." |
nothing calls this directly
no test coverage detected