(source_filename: str, target_filename: str, *, directory: Path)
| 109 | |
| 110 | |
| 111 | def rename_workflow(source_filename: str, target_filename: str, *, directory: Path) -> None: |
| 112 | source_safe = validate_workflow_filename(source_filename, require_yaml_extension=True) |
| 113 | target_safe = validate_workflow_filename(target_filename, require_yaml_extension=True) |
| 114 | |
| 115 | if source_safe == target_safe: |
| 116 | raise ValidationError("Source and target filenames must be different", field="new_filename") |
| 117 | |
| 118 | source_path = directory / source_safe |
| 119 | target_path = directory / target_safe |
| 120 | |
| 121 | if not source_path.exists() or not source_path.is_file(): |
| 122 | raise ResourceNotFoundError( |
| 123 | "Workflow file not found", |
| 124 | resource_type="workflow", |
| 125 | resource_id=source_safe, |
| 126 | ) |
| 127 | |
| 128 | if target_path.exists(): |
| 129 | raise ResourceConflictError( |
| 130 | "Target workflow already exists", |
| 131 | resource_type="workflow", |
| 132 | resource_id=target_safe, |
| 133 | ) |
| 134 | |
| 135 | logger = get_server_logger() |
| 136 | try: |
| 137 | source_path.rename(target_path) |
| 138 | except Exception as exc: |
| 139 | logger.log_exception(exc, f"Failed to rename workflow file {source_safe} to {target_safe}") |
| 140 | raise WorkflowExecutionError( |
| 141 | "Failed to rename workflow file", |
| 142 | details={"source": source_safe, "target": target_safe}, |
| 143 | ) |
| 144 | |
| 145 | try: |
| 146 | new_workflow_id = Path(target_safe).stem |
| 147 | content = target_path.read_text(encoding="utf-8") |
| 148 | updated = _update_workflow_id(content, new_workflow_id) |
| 149 | if updated != content: |
| 150 | target_path.write_text(updated, encoding="utf-8") |
| 151 | except Exception as exc: |
| 152 | logger.log_exception(exc, f"Failed to update workflow id after rename to {target_safe}") |
| 153 | raise WorkflowExecutionError( |
| 154 | "Failed to update workflow id after rename", |
| 155 | details={"target": target_safe}, |
| 156 | ) |
| 157 | |
| 158 | logger.info( |
| 159 | "Workflow file renamed", |
| 160 | log_type=LogType.WORKFLOW, |
| 161 | source=source_safe, |
| 162 | target=target_safe, |
| 163 | action="rename", |
| 164 | ) |
| 165 | |
| 166 | |
| 167 | def copy_workflow(source_filename: str, target_filename: str, *, directory: Path) -> None: |
no test coverage detected