(
self,
verify: ssl.SSLContext | str | bool = True,
cert: CertTypes | None = None,
trust_env: bool = True,
http1: bool = True,
http2: bool = False,
limits: Limits = DEFAULT_LIMITS,
proxy: ProxyTypes | None = None,
uds: str | None = None,
local_address: str | None = None,
retries: int = 0,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
)
| 278 | |
| 279 | class AsyncHTTPTransport(AsyncBaseTransport): |
| 280 | def __init__( |
| 281 | self, |
| 282 | verify: ssl.SSLContext | str | bool = True, |
| 283 | cert: CertTypes | None = None, |
| 284 | trust_env: bool = True, |
| 285 | http1: bool = True, |
| 286 | http2: bool = False, |
| 287 | limits: Limits = DEFAULT_LIMITS, |
| 288 | proxy: ProxyTypes | None = None, |
| 289 | uds: str | None = None, |
| 290 | local_address: str | None = None, |
| 291 | retries: int = 0, |
| 292 | socket_options: typing.Iterable[SOCKET_OPTION] | None = None, |
| 293 | ) -> None: |
| 294 | import httpcore |
| 295 | |
| 296 | proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy |
| 297 | ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) |
| 298 | |
| 299 | if proxy is None: |
| 300 | self._pool = httpcore.AsyncConnectionPool( |
| 301 | ssl_context=ssl_context, |
| 302 | max_connections=limits.max_connections, |
| 303 | max_keepalive_connections=limits.max_keepalive_connections, |
| 304 | keepalive_expiry=limits.keepalive_expiry, |
| 305 | http1=http1, |
| 306 | http2=http2, |
| 307 | uds=uds, |
| 308 | local_address=local_address, |
| 309 | retries=retries, |
| 310 | socket_options=socket_options, |
| 311 | ) |
| 312 | elif proxy.url.scheme in ("http", "https"): |
| 313 | self._pool = httpcore.AsyncHTTPProxy( |
| 314 | proxy_url=httpcore.URL( |
| 315 | scheme=proxy.url.raw_scheme, |
| 316 | host=proxy.url.raw_host, |
| 317 | port=proxy.url.port, |
| 318 | target=proxy.url.raw_path, |
| 319 | ), |
| 320 | proxy_auth=proxy.raw_auth, |
| 321 | proxy_headers=proxy.headers.raw, |
| 322 | proxy_ssl_context=proxy.ssl_context, |
| 323 | ssl_context=ssl_context, |
| 324 | max_connections=limits.max_connections, |
| 325 | max_keepalive_connections=limits.max_keepalive_connections, |
| 326 | keepalive_expiry=limits.keepalive_expiry, |
| 327 | http1=http1, |
| 328 | http2=http2, |
| 329 | socket_options=socket_options, |
| 330 | ) |
| 331 | elif proxy.url.scheme in ("socks5", "socks5h"): |
| 332 | try: |
| 333 | import socksio # noqa |
| 334 | except ImportError: # pragma: no cover |
| 335 | raise ImportError( |
| 336 | "Using SOCKS proxy, but the 'socksio' package is not installed. " |
| 337 | "Make sure to install httpx using `pip install httpx[socks]`." |
nothing calls this directly
no test coverage detected