MCPcopy
hub / github.com/encode/uvicorn / MockLoop

Class MockLoop

tests/protocols/test_http.py:224–248  ·  tests/protocols/test_http.py::MockLoop

Source from the content-addressed store, hash-verified

222
223
224class MockLoop:
225 def __init__(self):
226 self._tasks: list[asyncio.Task[Any]] = []
227 self._later: list[MockTimerHandle] = []
228
229 def create_task(self, coroutine: Any) -> Any:
230 self._tasks.insert(0, coroutine)
231 return MockTask()
232
233 def call_later(self, delay: float, callback: Callable[[], None], *args: Any) -> MockTimerHandle:
234 handle = MockTimerHandle(self._later, delay, callback, args)
235 self._later.insert(0, handle)
236 return handle
237
238 async def run_one(self):
239 return await self._tasks.pop()
240
241 def run_later(self, with_delay: float) -> None:
242 later: list[MockTimerHandle] = []
243 for timer_handle in self._later:
244 if with_delay >= timer_handle.delay:
245 timer_handle.callback(*timer_handle.args)
246 else:
247 later.append(timer_handle)
248 self._later = later
249
250
251class MockTask:

Callers 1

get_connected_protocolFunction · 0.70

Calls

no outgoing calls

Tested by 1

get_connected_protocolFunction · 0.56