MCPcopy
hub / github.com/openai/openai-python / get_token

Method get_token

src/openai/auth/_workload.py:190–217  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

188 self._condition = threading.Condition(self._lock)
189
190 def get_token(self) -> str:
191 with self._lock:
192 while self._refreshing and self._token_unusable():
193 self._condition.wait()
194
195 if not self._token_unusable() and not self._needs_refresh():
196 return cast(str, self._cached_token)
197
198 if self._refreshing:
199 while self._refreshing:
200 self._condition.wait()
201 token = self._cached_token # type: ignore[unreachable]
202 if self._token_unusable():
203 raise RuntimeError("Token is unusable after refresh completed")
204 return cast(str, token)
205
206 self._refreshing = True
207
208 try:
209 self._perform_refresh()
210 with self._lock:
211 if self._token_unusable():
212 raise RuntimeError("Token is unusable after refresh completed")
213 return cast(str, self._cached_token)
214 finally:
215 with self._lock:
216 self._refreshing = False
217 self._condition.notify_all()
218
219 async def get_token_async(self) -> str:
220 return await to_thread(self.get_token)

Callers 1

_send_with_auth_retryMethod · 0.80

Calls 3

_token_unusableMethod · 0.95
_needs_refreshMethod · 0.95
_perform_refreshMethod · 0.95

Tested by

no test coverage detected