MCPcopy
hub / github.com/OpenBMB/ChatDev / rename_workflow

Function rename_workflow

server/services/workflow_storage.py:111–164  ·  view source on GitHub ↗
(source_filename: str, target_filename: str, *, directory: Path)

Source from the content-addressed store, hash-verified

109
110
111def rename_workflow(source_filename: str, target_filename: str, *, directory: Path) -> None:
112 source_safe = validate_workflow_filename(source_filename, require_yaml_extension=True)
113 target_safe = validate_workflow_filename(target_filename, require_yaml_extension=True)
114
115 if source_safe == target_safe:
116 raise ValidationError("Source and target filenames must be different", field="new_filename")
117
118 source_path = directory / source_safe
119 target_path = directory / target_safe
120
121 if not source_path.exists() or not source_path.is_file():
122 raise ResourceNotFoundError(
123 "Workflow file not found",
124 resource_type="workflow",
125 resource_id=source_safe,
126 )
127
128 if target_path.exists():
129 raise ResourceConflictError(
130 "Target workflow already exists",
131 resource_type="workflow",
132 resource_id=target_safe,
133 )
134
135 logger = get_server_logger()
136 try:
137 source_path.rename(target_path)
138 except Exception as exc:
139 logger.log_exception(exc, f"Failed to rename workflow file {source_safe} to {target_safe}")
140 raise WorkflowExecutionError(
141 "Failed to rename workflow file",
142 details={"source": source_safe, "target": target_safe},
143 )
144
145 try:
146 new_workflow_id = Path(target_safe).stem
147 content = target_path.read_text(encoding="utf-8")
148 updated = _update_workflow_id(content, new_workflow_id)
149 if updated != content:
150 target_path.write_text(updated, encoding="utf-8")
151 except Exception as exc:
152 logger.log_exception(exc, f"Failed to update workflow id after rename to {target_safe}")
153 raise WorkflowExecutionError(
154 "Failed to update workflow id after rename",
155 details={"target": target_safe},
156 )
157
158 logger.info(
159 "Workflow file renamed",
160 log_type=LogType.WORKFLOW,
161 source=source_safe,
162 target=target_safe,
163 action="rename",
164 )
165
166
167def copy_workflow(source_filename: str, target_filename: str, *, directory: Path) -> None:

Callers 1

rename_workflow_fileFunction · 0.90

Calls 9

ValidationErrorClass · 0.90
get_server_loggerFunction · 0.90
_update_workflow_idFunction · 0.85
log_exceptionMethod · 0.80
infoMethod · 0.45

Tested by

no test coverage detected