Execute an action on a dirty app. The action runs in a thread pool executor to avoid blocking the asyncio event loop. Execution timeout is enforced using ``dirty_timeout`` config. Args: app_path: Import path of the dirty app action:
(self, app_path, action, args, kwargs)
| 448 | await gen.aclose() |
| 449 | |
| 450 | async def execute(self, app_path, action, args, kwargs): |
| 451 | """ |
| 452 | Execute an action on a dirty app. |
| 453 | |
| 454 | The action runs in a thread pool executor to avoid blocking the |
| 455 | asyncio event loop. Execution timeout is enforced using |
| 456 | ``dirty_timeout`` config. |
| 457 | |
| 458 | Args: |
| 459 | app_path: Import path of the dirty app |
| 460 | action: Action name to execute |
| 461 | args: Positional arguments |
| 462 | kwargs: Keyword arguments |
| 463 | |
| 464 | Returns: |
| 465 | Result from the app action |
| 466 | |
| 467 | Raises: |
| 468 | DirtyAppNotFoundError: If app is not loaded |
| 469 | DirtyTimeoutError: If execution exceeds timeout |
| 470 | DirtyAppError: If execution fails |
| 471 | """ |
| 472 | if app_path not in self.apps: |
| 473 | raise DirtyAppNotFoundError(app_path) |
| 474 | |
| 475 | app = self.apps[app_path] |
| 476 | timeout = self.cfg.dirty_timeout if self.cfg.dirty_timeout > 0 else None |
| 477 | |
| 478 | # Run the app call in the thread pool to avoid blocking |
| 479 | # the event loop for CPU-bound operations |
| 480 | loop = asyncio.get_running_loop() |
| 481 | |
| 482 | try: |
| 483 | result = await asyncio.wait_for( |
| 484 | loop.run_in_executor( |
| 485 | self._executor, |
| 486 | lambda: app(action, *args, **kwargs) |
| 487 | ), |
| 488 | timeout=timeout |
| 489 | ) |
| 490 | return result |
| 491 | except asyncio.TimeoutError: |
| 492 | # Note: The thread continues running - we just stop waiting |
| 493 | self.log.warning( |
| 494 | "Execution timeout for %s.%s after %ds", |
| 495 | app_path, action, timeout |
| 496 | ) |
| 497 | raise DirtyTimeoutError( |
| 498 | f"Execution of {app_path}.{action} timed out", |
| 499 | timeout=timeout |
| 500 | ) |
| 501 | |
| 502 | def _cleanup(self): |
| 503 | """Clean up resources on shutdown.""" |