Get the workers class attribute from a dirty app without instantiating it. This is used by the arbiter to determine how many workers should load an app based on the class attribute, without needing to actually load the app. Args: import_path: String in format 'module.p
(import_path)
| 298 | |
| 299 | |
| 300 | def get_app_workers_attribute(import_path): |
| 301 | """ |
| 302 | Get the workers class attribute from a dirty app without instantiating it. |
| 303 | |
| 304 | This is used by the arbiter to determine how many workers should load |
| 305 | an app based on the class attribute, without needing to actually load |
| 306 | the app. |
| 307 | |
| 308 | Args: |
| 309 | import_path: String in format 'module.path:ClassName' |
| 310 | |
| 311 | Returns: |
| 312 | The workers class attribute value (int or None) |
| 313 | |
| 314 | Raises: |
| 315 | DirtyAppNotFoundError: If the module or class cannot be found |
| 316 | DirtyAppError: If the import path format is invalid |
| 317 | """ |
| 318 | if ':' not in import_path: |
| 319 | raise DirtyAppError( |
| 320 | f"Invalid import path format: {import_path}. " |
| 321 | f"Expected 'module.path:ClassName'", |
| 322 | app_path=import_path |
| 323 | ) |
| 324 | |
| 325 | module_path, class_name = import_path.rsplit(':', 1) |
| 326 | |
| 327 | try: |
| 328 | # Import the module |
| 329 | if module_path in sys.modules: |
| 330 | module = sys.modules[module_path] |
| 331 | else: |
| 332 | module = importlib.import_module(module_path) |
| 333 | except ImportError as e: |
| 334 | raise DirtyAppNotFoundError(import_path) from e |
| 335 | |
| 336 | # Get the class from the module |
| 337 | try: |
| 338 | app_class = getattr(module, class_name) |
| 339 | except AttributeError: |
| 340 | raise DirtyAppNotFoundError(import_path) from None |
| 341 | |
| 342 | # Validate it's a class |
| 343 | if not isinstance(app_class, type): |
| 344 | raise DirtyAppError( |
| 345 | f"{import_path} is not a class", |
| 346 | app_path=import_path |
| 347 | ) |
| 348 | |
| 349 | # Return the workers attribute (defaults to None if not set) |
| 350 | return getattr(app_class, 'workers', None) |