MCPcopy
hub / github.com/openai/openai-python / test_multiple_events

Function test_multiple_events

tests/test_streaming.py:63–80  ·  view source on GitHub ↗
(sync: bool, client: OpenAI, async_client: AsyncOpenAI)

Source from the content-addressed store, hash-verified

61@pytest.mark.asyncio
62@pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"])
63async def test_multiple_events(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None:
64 def body() -> Iterator[bytes]:
65 yield b"event: ping\n"
66 yield b"\n"
67 yield b"event: completion\n"
68 yield b"\n"
69
70 iterator = make_event_iterator(content=body(), sync=sync, client=client, async_client=async_client)
71
72 sse = await iter_next(iterator)
73 assert sse.event == "ping"
74 assert sse.data == ""
75
76 sse = await iter_next(iterator)
77 assert sse.event == "completion"
78 assert sse.data == ""
79
80 await assert_empty_iter(iterator)
81
82
83@pytest.mark.asyncio

Callers

nothing calls this directly

Calls 4

make_event_iteratorFunction · 0.85
iter_nextFunction · 0.85
assert_empty_iterFunction · 0.85
bodyFunction · 0.70

Tested by

no test coverage detected