MCPcopy
hub / github.com/openai/openai-python / enqueue

Method enqueue

src/openai/_send_queue.py:25–36  ·  view source on GitHub ↗

Append *data* to the queue. Raises :class:`WebSocketQueueFullError` if the message would exceed the byte-size limit.

(self, data: str)

Source from the content-addressed store, hash-verified

23 self._lock = threading.Lock()
24
25 def enqueue(self, data: str) -> None:
26 """Append *data* to the queue.
27
28 Raises :class:`WebSocketQueueFullError` if the message would
29 exceed the byte-size limit.
30 """
31 byte_length = len(data.encode("utf-8"))
32 with self._lock:
33 if self._bytes + byte_length > self._max_bytes:
34 raise WebSocketQueueFullError("send queue is full, message discarded")
35 self._queue.append((data, byte_length))
36 self._bytes += byte_length
37
38 def flush_sync(self, send: typing.Callable[[str], object]) -> None:
39 """Send every queued message via *send*.

Callers 15

test_boolMethod · 0.95
test_flush_syncMethod · 0.95
test_flush_asyncMethod · 0.95
sendMethod · 0.80
send_rawMethod · 0.80
sendMethod · 0.80

Calls 2

appendMethod · 0.45