(filename: str, content: str)
| 63 | |
| 64 | |
| 65 | def validate_workflow_content(filename: str, content: str) -> Tuple[str, Any]: |
| 66 | safe_filename = validate_workflow_filename(filename, require_yaml_extension=True) |
| 67 | |
| 68 | try: |
| 69 | yaml_content = yaml.safe_load(content) |
| 70 | if yaml_content is None: |
| 71 | raise ValidationError("YAML content is empty", field="content") |
| 72 | |
| 73 | errors = check_config(yaml_content) |
| 74 | if errors: |
| 75 | raise ValidationError(f"YAML validation errors:\n{errors}", field="content") |
| 76 | except yaml.YAMLError as exc: |
| 77 | logger = get_server_logger() |
| 78 | logger.warning("Invalid YAML content in upload", details={"error": str(exc)}) |
| 79 | raise ValidationError(f"Invalid YAML syntax: {exc}", field="content") |
| 80 | |
| 81 | return safe_filename, yaml_content |
| 82 | |
| 83 | |
| 84 | def persist_workflow( |
no test coverage detected