(self)
| 226 | self._cached_token_refresh_at_monotonic = None |
| 227 | |
| 228 | def _perform_refresh(self) -> None: |
| 229 | token_data = self._fetch_token_from_exchange() |
| 230 | now = time.monotonic() |
| 231 | expires_in = token_data["expires_in"] |
| 232 | |
| 233 | with self._lock: |
| 234 | self._cached_token = token_data["access_token"] |
| 235 | self._cached_token_expires_at_monotonic = now + expires_in |
| 236 | self._cached_token_refresh_at_monotonic = now + self._refresh_delay_seconds(expires_in) |
| 237 | |
| 238 | def _fetch_token_from_exchange(self) -> dict[str, Any]: |
| 239 | subject_token = self._get_subject_token() |
no test coverage detected