MCPcopy
hub / github.com/openai/openai-python / _BedrockBearerAuth

Class _BedrockBearerAuth

src/openai/providers/bedrock.py:75–124  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

73
74
75class _BedrockBearerAuth:
76 def __init__(self, token_provider: BedrockTokenProvider, *, base_url: httpx.URL) -> None:
77 self._token_provider = token_provider
78 self._base_url = base_url
79
80 def _validate_request(self, request: httpx.Request) -> None:
81 _assert_provider_owns_authorization(request)
82 if not _same_origin(request.url, self._base_url):
83 raise OpenAIError(
84 "Refusing to authenticate a Bedrock request for an origin other than the configured provider URL."
85 )
86
87 def _resolve_token(self) -> str:
88 try:
89 token = cast(object, self._token_provider())
90 except OpenAIError:
91 raise
92 except Exception as exc:
93 raise OpenAIError("Failed to resolve a bearer credential for Bedrock.") from exc
94
95 if inspect.isawaitable(token):
96 close = getattr(token, "close", None)
97 if callable(close):
98 close()
99 raise OpenAIError("An async Bedrock token provider requires `AsyncOpenAI`.")
100 if not isinstance(token, str) or not token.strip():
101 raise OpenAIError("The Bedrock bearer credential provider must return a non-empty string.")
102 return token
103
104 async def _resolve_token_async(self) -> str:
105 try:
106 token = cast(object, self._token_provider())
107 if inspect.isawaitable(token):
108 token = await token
109 except OpenAIError:
110 raise
111 except Exception as exc:
112 raise OpenAIError("Failed to resolve a bearer credential for Bedrock.") from exc
113
114 if not isinstance(token, str) or not token.strip():
115 raise OpenAIError("The Bedrock bearer credential provider must return a non-empty string.")
116 return token
117
118 def prepare_request(self, request: httpx.Request) -> None:
119 self._validate_request(request)
120 request.headers["Authorization"] = f"Bearer {self._resolve_token()}"
121
122 async def prepare_async_request(self, request: httpx.Request) -> None:
123 self._validate_request(request)
124 request.headers["Authorization"] = f"Bearer {await self._resolve_token_async()}"
125
126
127class _BedrockSigV4Auth:

Callers 1

configureMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected