MCPcopy
hub / github.com/openai/openai-python / make_async_snapshot_request

Function make_async_snapshot_request

tests/lib/snapshots.py:60–100  ·  view source on GitHub ↗
(
    func: Callable[[AsyncOpenAI], Awaitable[_T]],
    *,
    content_snapshot: Any,
    respx_mock: MockRouter,
    mock_client: AsyncOpenAI,
    path: str,
)

Source from the content-addressed store, hash-verified

58
59
60async def make_async_snapshot_request(
61 func: Callable[[AsyncOpenAI], Awaitable[_T]],
62 *,
63 content_snapshot: Any,
64 respx_mock: MockRouter,
65 mock_client: AsyncOpenAI,
66 path: str,
67) -> _T:
68 live = os.environ.get("OPENAI_LIVE") == "1"
69 if live:
70
71 async def _on_response(response: httpx.Response) -> None:
72 # update the content snapshot
73 assert json.dumps(json.loads(await response.aread())) == content_snapshot
74
75 respx_mock.stop()
76
77 client = AsyncOpenAI(
78 http_client=httpx.AsyncClient(
79 event_hooks={
80 "response": [_on_response],
81 }
82 )
83 )
84 else:
85 respx_mock.post(path).mock(
86 return_value=httpx.Response(
87 200,
88 content=get_snapshot_value(content_snapshot),
89 headers={"content-type": "application/json"},
90 )
91 )
92
93 client = mock_client
94
95 result = await func(client)
96
97 if live:
98 await client.close()
99
100 return result

Calls 5

AsyncOpenAIClass · 0.90
stopMethod · 0.80
getMethod · 0.45
postMethod · 0.45
closeMethod · 0.45

Tested by 1