(self, options: FinalRequestOptions)
| 372 | |
| 373 | @override |
| 374 | def _prepare_options(self, options: FinalRequestOptions) -> FinalRequestOptions: |
| 375 | headers: dict[str, str | Omit] = {**options.headers} if is_given(options.headers) else {} |
| 376 | |
| 377 | options = model_copy(options) |
| 378 | options.headers = headers |
| 379 | |
| 380 | azure_ad_token = self._get_azure_ad_token() |
| 381 | if azure_ad_token is not None: |
| 382 | if not _has_header(headers, class="st">"Authorization"): |
| 383 | headers[class="st">"Authorization"] = fclass="st">"Bearer {azure_ad_token}" |
| 384 | elif self.api_key and self.api_key != API_KEY_SENTINEL: |
| 385 | if not _has_header(headers, class="st">"api-key"): |
| 386 | headers[class="st">"api-key"] = self.api_key |
| 387 | elif _has_auth_header(headers) or _has_auth_header(self.default_headers): |
| 388 | pass |
| 389 | else: |
| 390 | class="cm"># should never be hit |
| 391 | raise ValueError(class="st">"Unable to handle auth") |
| 392 | |
| 393 | return options |
| 394 | |
| 395 | def _configure_realtime(self, model: str, extra_query: Query) -> tuple[httpx.URL, dict[str, str]]: |
| 396 | auth_headers = {} |
nothing calls this directly
no test coverage detected