Persist execution results to disk. Args: outputs: Mapping of node outputs
(self, outputs: Dict[str, Any])
| 91 | self.config.metadata = value |
| 92 | |
| 93 | def record(self, outputs: Dict[str, Any]) -> None: |
| 94 | """Persist execution results to disk. |
| 95 | |
| 96 | Args: |
| 97 | outputs: Mapping of node outputs |
| 98 | """ |
| 99 | self.outputs = outputs |
| 100 | # self.directory.mkdir(parents=True, exist_ok=True) |
| 101 | |
| 102 | # Persist node outputs |
| 103 | outputs_path = self.directory / "node_outputs.yaml" |
| 104 | if self.outputs: |
| 105 | with outputs_path.open("w", encoding="utf-8") as handle: |
| 106 | yaml.dump(self.outputs, handle, allow_unicode=True, sort_keys=False) |
| 107 | |
| 108 | # Persist workflow summary |
| 109 | summary = { |
| 110 | "project": self.config.name, |
| 111 | "organization": self.config.get_organization(), |
| 112 | "design_path": self.config.get_source_path(), |
| 113 | "metadata": self.config.metadata, |
| 114 | } |
| 115 | summary_path = self.directory / "workflow_summary.yaml" |
| 116 | with summary_path.open("w", encoding="utf-8") as handle: |
| 117 | yaml.dump(summary, handle, allow_unicode=True, sort_keys=False) |
| 118 | |
| 119 | def final_message(self) -> str: |
| 120 | """Build the final completion string. |
no test coverage detected