MCPcopy
hub / github.com/openai/openai-python / ResponseStreamManager

Class ResponseStreamManager

src/openai/lib/streaming/responses/_responses.py:95–129  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

93
94
95class ResponseStreamManager(Generic[TextFormatT]):
96 def __init__(
97 self,
98 api_request: Callable[[], Stream[RawResponseStreamEvent]],
99 *,
100 text_format: type[TextFormatT] | Omit,
101 input_tools: Iterable[ToolParam] | Omit,
102 starting_after: int | None,
103 ) -> None:
104 self.__stream: ResponseStream[TextFormatT] | None = None
105 self.__api_request = api_request
106 self.__text_format = text_format
107 self.__input_tools = input_tools
108 self.__starting_after = starting_after
109
110 def __enter__(self) -> ResponseStream[TextFormatT]:
111 raw_stream = self.__api_request()
112
113 self.__stream = ResponseStream(
114 raw_stream=raw_stream,
115 text_format=self.__text_format,
116 input_tools=self.__input_tools,
117 starting_after=self.__starting_after,
118 )
119
120 return self.__stream
121
122 def __exit__(
123 self,
124 exc_type: type[BaseException] | None,
125 exc: BaseException | None,
126 exc_tb: TracebackType | None,
127 ) -> None:
128 if self.__stream is not None:
129 self.__stream.close()
130
131
132class AsyncResponseStream(Generic[TextFormatT]):

Callers 1

streamMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected