MCPcopy
hub / github.com/openai/openai-python / test_enqueue_and_drain

Method test_enqueue_and_drain

tests/test_send_queue.py:12–20  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

10
11class TestSendQueue:
12 def test_enqueue_and_drain(self) -> None:
13 q = SendQueue()
14 q.enqueue('{"type": "session.update"}')
15 q.enqueue('{"type": "response.create"}')
16 assert len(q) == 2
17
18 items = q.drain()
19 assert items == ['{"type": "session.update"}', '{"type": "response.create"}']
20 assert len(q) == 0
21
22 def test_enqueue_respects_byte_limit(self) -> None:
23 q = SendQueue(max_bytes=10)

Callers

nothing calls this directly

Calls 3

enqueueMethod · 0.95
drainMethod · 0.95
SendQueueClass · 0.90

Tested by

no test coverage detected