MCPcopy
hub / github.com/openai/openai-python / _configure_realtime

Method _configure_realtime

src/openai/lib/azure.py:395–418  ·  view source on GitHub ↗
(self, model: str, extra_query: Query)

Source from the content-addressed store, hash-verified

393 return options
394
395 def _configure_realtime(self, model: str, extra_query: Query) -> tuple[httpx.URL, dict[str, str]]:
396 auth_headers = {}
397 query = {
398 **extra_query,
399 "api-version": self._api_version,
400 "deployment": self._azure_deployment or model,
401 }
402 if self.api_key and self.api_key != "<missing API key>":
403 auth_headers = {"api-key": self.api_key}
404 else:
405 token = self._get_azure_ad_token()
406 if token:
407 auth_headers = {"Authorization": f"Bearer {token}"}
408
409 if self.websocket_base_url is not None:
410 base_url = httpx.URL(self.websocket_base_url)
411 merge_raw_path = base_url.raw_path.rstrip(b"/") + b"/realtime"
412 realtime_url = base_url.copy_with(raw_path=merge_raw_path)
413 else:
414 base_url = self._prepare_url("/realtime")
415 realtime_url = base_url.copy_with(scheme="wss")
416
417 url = realtime_url.copy_with(params={**query})
418 return url, auth_headers
419
420
421class AsyncAzureOpenAI(BaseAzureClient[httpx.AsyncClient, AsyncStream[Any]], AsyncOpenAI):

Callers 6

__aenter__Method · 0.45
__enter__Method · 0.45
_connect_wsMethod · 0.45
_connect_wsMethod · 0.45

Calls 2

_get_azure_ad_tokenMethod · 0.95
_prepare_urlMethod · 0.45

Tested by 2