(
self,
request: httpx.Request,
*,
stream: bool,
retried: bool = False,
**kwargs: Unpack[HttpxSendArgs],
)
| 1034 | return Querystring(array_format="brackets") |
| 1035 | |
| 1036 | async def _send_with_auth_retry( |
| 1037 | self, |
| 1038 | request: httpx.Request, |
| 1039 | *, |
| 1040 | stream: bool, |
| 1041 | retried: bool = False, |
| 1042 | **kwargs: Unpack[HttpxSendArgs], |
| 1043 | ) -> httpx.Response: |
| 1044 | used_workload_identity_auth = False |
| 1045 | |
| 1046 | if self._workload_identity_auth is not None: |
| 1047 | authorization = request.headers.get("Authorization") |
| 1048 | if authorization == f"Bearer {WORKLOAD_IDENTITY_API_KEY_PLACEHOLDER}": |
| 1049 | request.headers["Authorization"] = f"Bearer {await self._workload_identity_auth.get_token_async()}" |
| 1050 | used_workload_identity_auth = True |
| 1051 | |
| 1052 | response = await super()._send_request(request, stream=stream, **kwargs) |
| 1053 | if ( |
| 1054 | response.status_code == 401 |
| 1055 | and self._workload_identity_auth is not None |
| 1056 | and used_workload_identity_auth |
| 1057 | and not retried |
| 1058 | ): |
| 1059 | await response.aclose() |
| 1060 | self._workload_identity_auth.invalidate_token() |
| 1061 | request.headers["Authorization"] = f"Bearer {await self._workload_identity_auth.get_token_async()}" |
| 1062 | return await self._send_with_auth_retry(request, stream=stream, retried=True, **kwargs) |
| 1063 | |
| 1064 | return response |
| 1065 | |
| 1066 | @override |
| 1067 | async def _send_request( |
no test coverage detected