MCPcopy
hub / github.com/openai/openai-python / test_token_provider_refresh_async

Function test_token_provider_refresh_async

tests/lib/test_bedrock.py:349–368  ·  view source on GitHub ↗
(respx_mock: MockRouter)

Source from the content-addressed store, hash-verified

347@pytest.mark.asyncio
348@pytest.mark.respx()
349async def test_token_provider_refresh_async(respx_mock: MockRouter) -> None:
350 respx_mock.post("https://example.com/openai/v1/responses").mock(
351 side_effect=[
352 httpx.Response(500, json={"error": "server error"}),
353 httpx.Response(200, json=RESPONSE_BODY),
354 ]
355 )
356 tokens = iter(["first", "second"])
357 client = AsyncBedrockOpenAI(
358 base_url="https://example.com/openai/v1",
359 bedrock_token_provider=lambda: next(tokens),
360 http_client=httpx.AsyncClient(trust_env=False),
361 max_retries=1,
362 )
363
364 await client.responses.create(model="gpt-4o", input="hello")
365
366 calls = cast("list[MockRequestCall]", respx_mock.calls)
367 assert calls[0].request.headers["Authorization"] == "Bearer first"
368 assert calls[1].request.headers["Authorization"] == "Bearer second"
369
370
371@pytest.mark.respx()

Callers

nothing calls this directly

Calls 3

AsyncBedrockOpenAIClass · 0.90
postMethod · 0.45
createMethod · 0.45

Tested by

no test coverage detected