MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / copy_workflow

Function copy_workflow

server/services/workflow_storage.py:167–207  ·  view source on GitHub ↗
(source_filename: str, target_filename: str, *, directory: Path)

Source from the content-addressed store, hash-verified

165
166
167def copy_workflow(source_filename: str, target_filename: str, *, directory: Path) -> None:
168 source_safe = validate_workflow_filename(source_filename, require_yaml_extension=True)
169 target_safe = validate_workflow_filename(target_filename, require_yaml_extension=True)
170
171 if source_safe == target_safe:
172 raise ValidationError("Source and target filenames must be different", field="new_filename")
173
174 source_path = directory / source_safe
175 target_path = directory / target_safe
176
177 if not source_path.exists() or not source_path.is_file():
178 raise ResourceNotFoundError(
179 "Workflow file not found",
180 resource_type="workflow",
181 resource_id=source_safe,
182 )
183
184 if target_path.exists():
185 raise ResourceConflictError(
186 "Target workflow already exists",
187 resource_type="workflow",
188 resource_id=target_safe,
189 )
190
191 logger = get_server_logger()
192 try:
193 target_path.write_text(source_path.read_text(encoding="utf-8"), encoding="utf-8")
194 except Exception as exc:
195 logger.log_exception(exc, f"Failed to copy workflow file {source_safe} to {target_safe}")
196 raise WorkflowExecutionError(
197 "Failed to copy workflow file",
198 details={"source": source_safe, "target": target_safe},
199 )
200
201 logger.info(
202 "Workflow file copied",
203 log_type=LogType.WORKFLOW,
204 source=source_safe,
205 target=target_safe,
206 action="copy",
207 )

Callers 1

copy_workflow_fileFunction · 0.90

Calls 8

ValidationErrorClass · 0.90
get_server_loggerFunction · 0.90
log_exceptionMethod · 0.80
infoMethod · 0.45

Tested by

no test coverage detected