Method
watch_restarts
tests/conftest.py:256–268
· tests/conftest.py::TestServer.watch_restarts
(self)
Source from the content-addressed store, hash-verified
| 254 | await sleep(0.2) |
| 255 | |
| 256 | async def watch_restarts(self) -> None: class="cm"># pragma: no cover |
| 257 | while True: |
| 258 | if self.should_exit: |
| 259 | return |
| 260 | |
| 261 | try: |
| 262 | await asyncio.wait_for(self.restart_requested.wait(), timeout=0.1) |
| 263 | except asyncio.TimeoutError: |
| 264 | continue |
| 265 | |
| 266 | self.restart_requested.clear() |
| 267 | await self.shutdown() |
| 268 | await self.startup() |
| 269 | |
| 270 | |
| 271 | def serve_in_thread(server: TestServer) -> typing.Iterator[TestServer]: |
Tested by
no test coverage detected