Manages graph construction, cycle detection, and execution order determination.
| 16 | |
| 17 | |
| 18 | class GraphManager: |
| 19 | """Manages graph construction, cycle detection, and execution order determination.""" |
| 20 | |
| 21 | def __init__(self, graph: "GraphContext") -> None: |
| 22 | """Initialize GraphManager with a GraphContext instance.""" |
| 23 | self.graph = graph |
| 24 | self.cycle_manager = CycleManager() |
| 25 | |
| 26 | def build_graph_structure(self) -> None: |
| 27 | """Build the complete graph structure including nodes, edges, and layers.""" |
| 28 | self._instantiate_nodes() |
| 29 | self._initiate_edges() |
| 30 | self._determine_start_nodes() |
| 31 | self._warn_on_untriggerable_nodes() |
| 32 | self._build_topology_and_metadata() |
| 33 | |
| 34 | def _instantiate_nodes(self) -> None: |
| 35 | """Instantiate all nodes from configuration.""" |
| 36 | self.graph.nodes.clear() |
| 37 | for node_def in self.graph.config.get_node_definitions(): |
| 38 | node_id = node_def.id |
| 39 | if node_id in self.graph.nodes: |
| 40 | print(f"Duplicated node id detected: {node_id}") |
| 41 | continue |
| 42 | node_instance = copy.deepcopy(node_def) |
| 43 | node_instance.predecessors = [] |
| 44 | node_instance.successors = [] |
| 45 | node_instance._outgoing_edges = [] |
| 46 | node_instance.vars = dict(self.graph.vars) |
| 47 | self.graph.nodes[node_id] = node_instance |
| 48 | |
| 49 | if node_instance.node_type == "subgraph": |
| 50 | self._build_subgraph(node_id) |
| 51 | |
| 52 | def _build_subgraph(self, node_id: str) -> None: |
| 53 | """Build a subgraph for the given node ID.""" |
| 54 | from entity.graph_config import GraphConfig |
| 55 | from workflow.graph_context import GraphContext |
| 56 | |
| 57 | subgraph_config_data = self.graph.nodes[node_id].as_config(SubgraphConfig) |
| 58 | if not subgraph_config_data: |
| 59 | return |
| 60 | |
| 61 | parent_source = self.graph.config.get_source_path() |
| 62 | subgraph_vars: Dict[str, Any] = {} |
| 63 | |
| 64 | if subgraph_config_data.type == "config": |
| 65 | inline_cfg = subgraph_config_data.as_config(SubgraphInlineConfig) |
| 66 | if not inline_cfg: |
| 67 | raise ConfigError( |
| 68 | f"Inline subgraph configuration missing for node '{node_id}'", |
| 69 | subgraph_config_data.path, |
| 70 | ) |
| 71 | config_payload = copy.deepcopy(inline_cfg.graph) |
| 72 | source_path = parent_source |
| 73 | elif subgraph_config_data.type == "file": |
| 74 | file_cfg = subgraph_config_data.as_config(SubgraphFileConfig) |
| 75 | if not file_cfg: |
no outgoing calls
no test coverage detected