MCPcopy
hub / github.com/encode/uvicorn / MockTransport

Class MockTransport

tests/protocols/test_http.py:170–205  ·  tests/protocols/test_http.py::MockTransport

Source from the content-addressed store, hash-verified

168
169
170class MockTransport:
171 def __init__(
172 self, sockname: tuple[str, int] | None = None, peername: tuple[str, int] | None = None, sslcontext: bool = False
173 ):
174 self.sockname = (class="st">"127.0.0.1", 8000) if sockname is None else sockname
175 self.peername = (class="st">"127.0.0.1", 8001) if peername is None else peername
176 self.sslcontext = sslcontext
177 self.closed = False
178 self.buffer = bclass="st">""
179 self.read_paused = False
180
181 def get_extra_info(self, key: Any):
182 return {class="st">"sockname": self.sockname, class="st">"peername": self.peername, class="st">"sslcontext": self.sslcontext}.get(key)
183
184 def write(self, data: bytes):
185 assert not self.closed
186 self.buffer += data
187
188 def close(self):
189 assert not self.closed
190 self.closed = True
191
192 def pause_reading(self):
193 self.read_paused = True
194
195 def resume_reading(self):
196 self.read_paused = False
197
198 def is_closing(self):
199 return self.closed
200
201 def clear_buffer(self):
202 self.buffer = bclass="st">""
203
204 def set_protocol(self, protocol: asyncio.Protocol):
205 pass
206
207
208class MockTimerHandle:

Callers 1

get_connected_protocolFunction · 0.70

Calls

no outgoing calls

Tested by 1

get_connected_protocolFunction · 0.56