Build the final completion string. Returns: Completion message text
(self)
| 117 | yaml.dump(summary, handle, allow_unicode=True, sort_keys=False) |
| 118 | |
| 119 | def final_message(self) -> str: |
| 120 | """Build the final completion string. |
| 121 | |
| 122 | Returns: |
| 123 | Completion message text |
| 124 | """ |
| 125 | if not self.outputs: |
| 126 | return "Workflow finished with no outputs." |
| 127 | |
| 128 | sink_nodes = [node_id for node_id, node in self.nodes.items() if not node.successors] |
| 129 | return ( |
| 130 | f"Workflow finished with {len(self.outputs)} node outputs" |
| 131 | f" ({len(sink_nodes)} terminal nodes)." |
| 132 | ) |
| 133 | |
| 134 | def get_sink_nodes(self) -> List[Node]: |
| 135 | """Return all leaf nodes (nodes without successors).""" |
no test coverage detected