(
self,
request: httpx.Request,
*,
stream: bool,
retried: bool = False,
**kwargs: Unpack[HttpxSendArgs],
)
| 438 | return Querystring(array_format="brackets") |
| 439 | |
| 440 | def _send_with_auth_retry( |
| 441 | self, |
| 442 | request: httpx.Request, |
| 443 | *, |
| 444 | stream: bool, |
| 445 | retried: bool = False, |
| 446 | **kwargs: Unpack[HttpxSendArgs], |
| 447 | ) -> httpx.Response: |
| 448 | used_workload_identity_auth = False |
| 449 | |
| 450 | if self._workload_identity_auth is not None: |
| 451 | authorization = request.headers.get("Authorization") |
| 452 | if authorization == f"Bearer {WORKLOAD_IDENTITY_API_KEY_PLACEHOLDER}": |
| 453 | request.headers["Authorization"] = f"Bearer {self._workload_identity_auth.get_token()}" |
| 454 | used_workload_identity_auth = True |
| 455 | |
| 456 | response = super()._send_request(request, stream=stream, **kwargs) |
| 457 | if ( |
| 458 | response.status_code == 401 |
| 459 | and self._workload_identity_auth is not None |
| 460 | and used_workload_identity_auth |
| 461 | and not retried |
| 462 | ): |
| 463 | response.close() |
| 464 | self._workload_identity_auth.invalidate_token() |
| 465 | request.headers["Authorization"] = f"Bearer {self._workload_identity_auth.get_token()}" |
| 466 | return self._send_with_auth_retry(request, stream=stream, retried=True, **kwargs) |
| 467 | |
| 468 | return response |
| 469 | |
| 470 | @override |
| 471 | def _send_request( |
no test coverage detected