Assert the HTTPSPoolKey fields are honored when selecting a pool.
(self)
| 119 | assert all(isinstance(key, PoolKey) for key in p.pools.keys()) |
| 120 | |
| 121 | def test_https_pool_key_fields(self) -> None: |
| 122 | """Assert the HTTPSPoolKey fields are honored when selecting a pool.""" |
| 123 | connection_pool_kw = { |
| 124 | "timeout": timeout.Timeout(3.14), |
| 125 | "retries": retry.Retry(total=6, connect=2), |
| 126 | "block": True, |
| 127 | "source_address": "127.0.0.1", |
| 128 | "key_file": "/root/totally_legit.key", |
| 129 | "cert_file": "/root/totally_legit.crt", |
| 130 | "cert_reqs": "CERT_REQUIRED", |
| 131 | "ca_certs": "/root/path_to_pem", |
| 132 | "ssl_version": "SSLv23_METHOD", |
| 133 | "blocksize": _DEFAULT_BLOCKSIZE + 1, |
| 134 | } |
| 135 | p = PoolManager() |
| 136 | conn_pools = [ |
| 137 | p.connection_from_url("https://example.com/"), |
| 138 | p.connection_from_url("https://example.com:4333/"), |
| 139 | p.connection_from_url("https://other.example.com/"), |
| 140 | ] |
| 141 | # Asking for a connection pool with the same key should give us an |
| 142 | # existing pool. |
| 143 | dup_pools = [] |
| 144 | |
| 145 | for key, value in connection_pool_kw.items(): |
| 146 | p.connection_pool_kw[key] = value |
| 147 | conn_pools.append(p.connection_from_url("https://example.com/")) |
| 148 | dup_pools.append(p.connection_from_url("https://example.com/")) |
| 149 | |
| 150 | assert all( |
| 151 | x is not y |
| 152 | for i, x in enumerate(conn_pools) |
| 153 | for j, y in enumerate(conn_pools) |
| 154 | if i != j |
| 155 | ) |
| 156 | assert all(pool in conn_pools for pool in dup_pools) |
| 157 | assert all(isinstance(key, PoolKey) for key in p.pools.keys()) |
| 158 | |
| 159 | def test_default_pool_key_funcs_copy(self) -> None: |
| 160 | """Assert each PoolManager gets a copy of ``pool_keys_by_scheme``.""" |
nothing calls this directly
no test coverage detected