MCPcopy
hub / github.com/redis/redis-py / test_run_recurring_async

Method test_run_recurring_async

tests/test_background.py:96–118  ·  view source on GitHub ↗
(self, interval, timeout, min_call_count)

Source from the content-addressed store, hash-verified

94 ],
95 )
96 async def test_run_recurring_async(self, interval, timeout, min_call_count):
97 execute_counter = []
98 one = "arg1"
99 two = 9999
100
101 async def callback(arg1: str, arg2: int):
102 nonlocal execute_counter
103 nonlocal one
104 nonlocal two
105
106 execute_counter.append(1)
107
108 assert arg1 == one
109 assert arg2 == two
110
111 scheduler = BackgroundScheduler()
112 await scheduler.run_recurring_async(interval, callback, one, two)
113 assert len(execute_counter) == 0
114
115 await self._wait_for_calls_async(execute_counter, min_call_count, timeout)
116
117 # Use >= instead of == to account for timing variations on CI runners
118 assert len(execute_counter) >= min_call_count
119
120 @pytest.mark.parametrize(
121 "interval,timeout,min_call_count",

Callers

nothing calls this directly

Calls 3

run_recurring_asyncMethod · 0.95
_wait_for_calls_asyncMethod · 0.95
BackgroundSchedulerClass · 0.90

Tested by

no test coverage detected