MCPcopy
hub / github.com/encode/httpx / watch_restarts

Method watch_restarts

tests/conftest.py:256–268  ·  tests/conftest.py::TestServer.watch_restarts
(self)

Source from the content-addressed store, hash-verified

254 await sleep(0.2)
255
256 async def watch_restarts(self) -> None: class="cm"># pragma: no cover
257 while True:
258 if self.should_exit:
259 return
260
261 try:
262 await asyncio.wait_for(self.restart_requested.wait(), timeout=0.1)
263 except asyncio.TimeoutError:
264 continue
265
266 self.restart_requested.clear()
267 await self.shutdown()
268 await self.startup()
269
270
271def serve_in_thread(server: TestServer) -> typing.Iterator[TestServer]:

Callers 1

serveMethod · 0.95

Calls 1

clearMethod · 0.80

Tested by

no test coverage detected