MCPcopy
hub / github.com/redis/redis-py / _wait_for_calls

Method _wait_for_calls

tests/test_background.py:12–19  ·  view source on GitHub ↗
(execute_counter, min_call_count, timeout)

Source from the content-addressed store, hash-verified

10class TestBackgroundScheduler:
11 @staticmethod
12 def _wait_for_calls(execute_counter, min_call_count, timeout):
13 if min_call_count == 0:
14 sleep(timeout)
15 return
16
17 deadline = monotonic() + max(timeout, 0.2)
18 while len(execute_counter) < min_call_count and monotonic() < deadline:
19 sleep(0.002)
20
21 @staticmethod
22 async def _wait_for_calls_async(execute_counter, min_call_count, timeout):

Callers 2

test_run_recurringMethod · 0.95

Calls 1

maxClass · 0.85

Tested by

no test coverage detected