MCPcopy
hub / github.com/openai/openai-python / _send_with_auth_retry

Method _send_with_auth_retry

src/openai/_client.py:1036–1064  ·  view source on GitHub ↗
(
        self,
        request: httpx.Request,
        *,
        stream: bool,
        retried: bool = False,
        **kwargs: Unpack[HttpxSendArgs],
    )

Source from the content-addressed store, hash-verified

1034 return Querystring(array_format="brackets")
1035
1036 async def _send_with_auth_retry(
1037 self,
1038 request: httpx.Request,
1039 *,
1040 stream: bool,
1041 retried: bool = False,
1042 **kwargs: Unpack[HttpxSendArgs],
1043 ) -> httpx.Response:
1044 used_workload_identity_auth = False
1045
1046 if self._workload_identity_auth is not None:
1047 authorization = request.headers.get("Authorization")
1048 if authorization == f"Bearer {WORKLOAD_IDENTITY_API_KEY_PLACEHOLDER}":
1049 request.headers["Authorization"] = f"Bearer {await self._workload_identity_auth.get_token_async()}"
1050 used_workload_identity_auth = True
1051
1052 response = await super()._send_request(request, stream=stream, **kwargs)
1053 if (
1054 response.status_code == 401
1055 and self._workload_identity_auth is not None
1056 and used_workload_identity_auth
1057 and not retried
1058 ):
1059 await response.aclose()
1060 self._workload_identity_auth.invalidate_token()
1061 request.headers["Authorization"] = f"Bearer {await self._workload_identity_auth.get_token_async()}"
1062 return await self._send_with_auth_retry(request, stream=stream, retried=True, **kwargs)
1063
1064 return response
1065
1066 @override
1067 async def _send_request(

Callers 1

_send_requestMethod · 0.95

Calls 5

get_token_asyncMethod · 0.80
acloseMethod · 0.80
invalidate_tokenMethod · 0.80
getMethod · 0.45
_send_requestMethod · 0.45

Tested by

no test coverage detected