Runs recurring coroutine with given interval in seconds in the current event loop. To be used only from an async context. No additional threads are created. Prevents overlapping executions by scheduling the next run only after the current one completes. Rai
(
self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args
)
| 355 | ) |
| 356 | |
| 357 | async def run_recurring_async( |
| 358 | self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args |
| 359 | ): |
| 360 | """ |
| 361 | Runs recurring coroutine with given interval in seconds in the current event loop. |
| 362 | To be used only from an async context. No additional threads are created. |
| 363 | |
| 364 | Prevents overlapping executions by scheduling the next run only after |
| 365 | the current one completes. |
| 366 | |
| 367 | Raises: |
| 368 | RuntimeError: If called without a running event loop (programming error) |
| 369 | """ |
| 370 | with self._lock: |
| 371 | if self._stopped: |
| 372 | return |
| 373 | |
| 374 | # This is an async method - it must be awaited in a running event loop. |
| 375 | # If get_running_loop() raises RuntimeError, let it propagate as that |
| 376 | # indicates a programming error (calling async method outside async context). |
| 377 | loop = asyncio.get_running_loop() |
| 378 | |
| 379 | def schedule_next(): |
| 380 | """Schedule the next execution after the current one completes.""" |
| 381 | with self._lock: |
| 382 | if self._stopped: |
| 383 | return |
| 384 | self._next_timer = loop.call_later(interval, execute_and_reschedule) |
| 385 | |
| 386 | def execute_and_reschedule(): |
| 387 | """Execute the coroutine and schedule next run after completion.""" |
| 388 | with self._lock: |
| 389 | if self._stopped: |
| 390 | return |
| 391 | |
| 392 | def on_complete(task: asyncio.Task): |
| 393 | """Callback when coroutine completes - schedule next execution.""" |
| 394 | # Log any exceptions (prevents "Task exception was never retrieved") |
| 395 | if task.cancelled(): |
| 396 | pass |
| 397 | elif task.exception() is not None: |
| 398 | logging.getLogger(__name__).debug( |
| 399 | "Recurring async coroutine raised exception", |
| 400 | exc_info=task.exception(), |
| 401 | ) |
| 402 | # Schedule next execution AFTER this one completes |
| 403 | schedule_next() |
| 404 | |
| 405 | try: |
| 406 | task = asyncio.ensure_future(coro(*args)) |
| 407 | task.add_done_callback(on_complete) |
| 408 | except Exception: |
| 409 | # If scheduling fails, still try to schedule next run |
| 410 | logging.getLogger(__name__).debug( |
| 411 | "Failed to schedule recurring async coroutine", exc_info=True |
| 412 | ) |
| 413 | schedule_next() |
| 414 |