MCPcopy
hub / github.com/redis/redis-py / run_recurring_async

Method run_recurring_async

redis/background.py:357–416  ·  view source on GitHub ↗

Runs recurring coroutine with given interval in seconds in the current event loop. To be used only from an async context. No additional threads are created. Prevents overlapping executions by scheduling the next run only after the current one completes. Rai

(
        self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args
    )

Source from the content-addressed store, hash-verified

355 )
356
357 async def run_recurring_async(
358 self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args
359 ):
360 """
361 Runs recurring coroutine with given interval in seconds in the current event loop.
362 To be used only from an async context. No additional threads are created.
363
364 Prevents overlapping executions by scheduling the next run only after
365 the current one completes.
366
367 Raises:
368 RuntimeError: If called without a running event loop (programming error)
369 """
370 with self._lock:
371 if self._stopped:
372 return
373
374 # This is an async method - it must be awaited in a running event loop.
375 # If get_running_loop() raises RuntimeError, let it propagate as that
376 # indicates a programming error (calling async method outside async context).
377 loop = asyncio.get_running_loop()
378
379 def schedule_next():
380 """Schedule the next execution after the current one completes."""
381 with self._lock:
382 if self._stopped:
383 return
384 self._next_timer = loop.call_later(interval, execute_and_reschedule)
385
386 def execute_and_reschedule():
387 """Execute the coroutine and schedule next run after completion."""
388 with self._lock:
389 if self._stopped:
390 return
391
392 def on_complete(task: asyncio.Task):
393 """Callback when coroutine completes - schedule next execution."""
394 # Log any exceptions (prevents "Task exception was never retrieved")
395 if task.cancelled():
396 pass
397 elif task.exception() is not None:
398 logging.getLogger(__name__).debug(
399 "Recurring async coroutine raised exception",
400 exc_info=task.exception(),
401 )
402 # Schedule next execution AFTER this one completes
403 schedule_next()
404
405 try:
406 task = asyncio.ensure_future(coro(*args))
407 task.add_done_callback(on_complete)
408 except Exception:
409 # If scheduling fails, still try to schedule next run
410 logging.getLogger(__name__).debug(
411 "Failed to schedule recurring async coroutine", exc_info=True
412 )
413 schedule_next()
414

Callers 2

initializeMethod · 0.80

Calls 1

get_running_loopMethod · 0.80

Tested by 1