Send an HTTP/2 request
( # type: ignore[override]
self,
method: str,
url: str,
body: _TYPE_BODY | None = None,
headers: typing.Mapping[str, str] | None = None,
*,
preload_content: bool = True,
decode_content: bool = True,
enforce_content_length: bool = True,
**kwargs: typing.Any,
)
| 266 | ) |
| 267 | |
| 268 | def request( # type: ignore[override] |
| 269 | self, |
| 270 | method: str, |
| 271 | url: str, |
| 272 | body: _TYPE_BODY | None = None, |
| 273 | headers: typing.Mapping[str, str] | None = None, |
| 274 | *, |
| 275 | preload_content: bool = True, |
| 276 | decode_content: bool = True, |
| 277 | enforce_content_length: bool = True, |
| 278 | **kwargs: typing.Any, |
| 279 | ) -> None: |
| 280 | """Send an HTTP/2 request""" |
| 281 | if "chunked" in kwargs: |
| 282 | # TODO this is often present from upstream. |
| 283 | # raise NotImplementedError("`chunked` isn't supported with HTTP/2") |
| 284 | pass |
| 285 | |
| 286 | if self.sock is not None: |
| 287 | self.sock.settimeout(self.timeout) |
| 288 | |
| 289 | self.putrequest(method, url) |
| 290 | |
| 291 | headers = headers or {} |
| 292 | for k, v in headers.items(): |
| 293 | if k.lower() == "transfer-encoding" and v == "chunked": |
| 294 | continue |
| 295 | else: |
| 296 | self.putheader(k, v) |
| 297 | |
| 298 | if b"user-agent" not in dict(self._headers): |
| 299 | self.putheader(b"user-agent", _get_default_user_agent()) |
| 300 | |
| 301 | if body: |
| 302 | self.endheaders(message_body=body) |
| 303 | self.send(body) |
| 304 | else: |
| 305 | self.endheaders() |
| 306 | |
| 307 | def close(self) -> None: |
| 308 | with self._h2_conn as conn: |