Fetch the OAuth 2.0 Authorization Server Metadata (RFC 8414), caching for the session.
(issuer: str = ISSUER)
| 73 | |
| 74 | |
| 75 | def discover(issuer: str = ISSUER) -> dict[str, Any]: |
| 76 | """Fetch the OAuth 2.0 Authorization Server Metadata (RFC 8414), caching for the session.""" |
| 77 | global _discovery_cache # noqa: PLW0603 |
| 78 | if _discovery_cache is not None: |
| 79 | return _discovery_cache |
| 80 | |
| 81 | url = issuer.rstrip("/") + DISCOVERY_PATH |
| 82 | req = urllib.request.Request(url, headers={"Accept": "application/json"}) |
| 83 | try: |
| 84 | with urllib.request.urlopen(req, timeout=15) as resp: |
| 85 | content_type = resp.headers.get("Content-Type", "") |
| 86 | body = resp.read() |
| 87 | except urllib.error.URLError as exc: |
| 88 | raise RuntimeError(f"Could not reach OAuth metadata endpoint at {url}: {exc}") from exc |
| 89 | |
| 90 | if "json" not in content_type: |
| 91 | raise RuntimeError( |
| 92 | f"OAuth metadata endpoint at {url} returned {content_type or 'unknown content type'} " |
| 93 | f"instead of JSON. Is the OAuth provider configured at {issuer}?" |
| 94 | ) |
| 95 | |
| 96 | try: |
| 97 | doc = json.loads(body) |
| 98 | except json.JSONDecodeError as exc: |
| 99 | raise RuntimeError(f"OAuth metadata endpoint at {url} returned invalid JSON: {exc}") from exc |
| 100 | |
| 101 | for key in ("authorization_endpoint", "token_endpoint"): |
| 102 | if key not in doc: |
| 103 | raise RuntimeError(f"OAuth metadata missing required field: {key}") |
| 104 | |
| 105 | _discovery_cache = doc |
| 106 | return doc |
| 107 | |
| 108 | |
| 109 | # --------------------------------------------------------------------------- |
no test coverage detected