Execute an action on a dirty app (sync/blocking). Args: app_path: Import path of the dirty app (e.g., 'myapp.ml:MLApp') action: Action to call on the app *args: Positional arguments **kwargs: Keyword arguments Returns:
(self, app_path, action, *args, **kwargs)
| 78 | ) from e |
| 79 | |
| 80 | def execute(self, app_path, action, *args, **kwargs): |
| 81 | """ |
| 82 | Execute an action on a dirty app (sync/blocking). |
| 83 | |
| 84 | Args: |
| 85 | app_path: Import path of the dirty app (e.g., 'myapp.ml:MLApp') |
| 86 | action: Action to call on the app |
| 87 | *args: Positional arguments |
| 88 | **kwargs: Keyword arguments |
| 89 | |
| 90 | Returns: |
| 91 | Result from the dirty app action |
| 92 | |
| 93 | Raises: |
| 94 | DirtyConnectionError: If connection fails |
| 95 | DirtyTimeoutError: If operation times out |
| 96 | DirtyError: If execution fails |
| 97 | """ |
| 98 | with self._lock: |
| 99 | return self._execute_locked(app_path, action, args, kwargs) |
| 100 | |
| 101 | def _execute_locked(self, app_path, action, args, kwargs): |
| 102 | """Execute while holding the lock.""" |