MCPcopy
hub / github.com/redis/redis-py / BackgroundScheduler

Class BackgroundScheduler

redis/background.py:7–466  ·  view source on GitHub ↗

Schedules background tasks execution either in separate thread or in the running event loop.

Source from the content-addressed store, hash-verified

5
6
7class BackgroundScheduler:
8 """
9 Schedules background tasks execution either in separate thread or in the running event loop.
10 """
11
12 def __init__(self):
13 self._next_timer = None
14 self._event_loops = []
15 self._lock = threading.Lock()
16 self._stopped = False
17 # Dedicated loop for health checks - ensures all health checks use the same loop
18 self._health_check_loop: asyncio.AbstractEventLoop | None = None
19 self._health_check_thread: threading.Thread | None = None
20 # Event to signal when health check loop is ready
21 self._health_check_loop_ready = threading.Event()
22
23 def __del__(self):
24 self.stop()
25
26 def stop(self):
27 """
28 Stop all scheduled tasks and clean up resources.
29 """
30 with self._lock:
31 if self._stopped:
32 return
33 self._stopped = True
34
35 if self._next_timer:
36 self._next_timer.cancel()
37 self._next_timer = None
38
39 # Stop all event loops
40 for loop in self._event_loops:
41 if loop.is_running():
42 loop.call_soon_threadsafe(loop.stop)
43
44 self._event_loops.clear()
45
46 def run_once(self, delay: float, callback: Callable, *args):
47 """
48 Runs callable task once after certain delay in seconds.
49 """
50 with self._lock:
51 if self._stopped:
52 return
53
54 # Run loop in a separate thread to unblock main thread.
55 loop = asyncio.new_event_loop()
56
57 with self._lock:
58 self._event_loops.append(loop)
59
60 thread = threading.Thread(
61 target=_start_event_loop_in_thread,
62 args=(loop, self._call_later, delay, callback, *args),
63 daemon=True,
64 )

Calls

no outgoing calls