(execute_counter, min_call_count, timeout)
| 20 | |
| 21 | @staticmethod |
| 22 | async def _wait_for_calls_async(execute_counter, min_call_count, timeout): |
| 23 | if min_call_count == 0: |
| 24 | await asyncio.sleep(timeout) |
| 25 | return |
| 26 | |
| 27 | loop = asyncio.get_running_loop() |
| 28 | deadline = loop.time() + max(timeout, 0.2) |
| 29 | while len(execute_counter) < min_call_count and loop.time() < deadline: |
| 30 | await asyncio.sleep(0.002) |
| 31 | |
| 32 | def test_run_once(self): |
| 33 | execute_counter = 0 |
no test coverage detected