MCPcopy
hub / github.com/redis/redis-py / _wait_for_calls_async

Method _wait_for_calls_async

tests/test_background.py:22–30  ·  view source on GitHub ↗
(execute_counter, min_call_count, timeout)

Source from the content-addressed store, hash-verified

20
21 @staticmethod
22 async def _wait_for_calls_async(execute_counter, min_call_count, timeout):
23 if min_call_count == 0:
24 await asyncio.sleep(timeout)
25 return
26
27 loop = asyncio.get_running_loop()
28 deadline = loop.time() + max(timeout, 0.2)
29 while len(execute_counter) < min_call_count and loop.time() < deadline:
30 await asyncio.sleep(0.002)
31
32 def test_run_once(self):
33 execute_counter = 0

Callers 1

Calls 4

maxClass · 0.85
get_running_loopMethod · 0.80
sleepMethod · 0.45
timeMethod · 0.45

Tested by

no test coverage detected