Execute a subgraph node. Args: node: Subgraph node definition inputs: Input messages list Returns: Result produced by the subgraph
(self, node: Node, inputs: List[Message])
| 29 | self.subgraphs = subgraphs |
| 30 | |
| 31 | def execute(self, node: Node, inputs: List[Message]) -> List[Message]: |
| 32 | """Execute a subgraph node. |
| 33 | |
| 34 | Args: |
| 35 | node: Subgraph node definition |
| 36 | inputs: Input messages list |
| 37 | |
| 38 | Returns: |
| 39 | Result produced by the subgraph |
| 40 | """ |
| 41 | if node.node_type != "subgraph": |
| 42 | raise ValueError(f"Node {node.id} is not a subgraph node") |
| 43 | |
| 44 | subgraph_config = node.as_config(SubgraphConfig) |
| 45 | if not subgraph_config: |
| 46 | raise ValueError(f"Node {node.id} has no subgraph configuration") |
| 47 | |
| 48 | task_payload: List[Message] = self._clone_messages(inputs) |
| 49 | if not task_payload: |
| 50 | task_payload = [self._build_message(MessageRole.USER, "", source="SUBGRAPH")] |
| 51 | |
| 52 | input_data = self._inputs_to_text(task_payload) |
| 53 | |
| 54 | self.log_manager.debug( |
| 55 | f"Subgraph processing for node {node.id}", |
| 56 | node_id=node.id, |
| 57 | details={ |
| 58 | "input_size": len(str(input_data)), |
| 59 | "input_result": input_data |
| 60 | } |
| 61 | ) |
| 62 | |
| 63 | # Retrieve the subgraph context |
| 64 | if node.id not in self.subgraphs: |
| 65 | raise ValueError(f"Subgraph for node {node.id} not found") |
| 66 | |
| 67 | subgraph = self.subgraphs[node.id] |
| 68 | |
| 69 | # Deep copy the subgraph to ensure isolation during parallel execution |
| 70 | # process. Nodes in the subgraph (e.g. Start) hold state (inputs/outputs) |
| 71 | # that must not be shared across threads. |
| 72 | subgraph = copy.deepcopy(subgraph) |
| 73 | |
| 74 | # Execute the subgraph (requires importing ``GraphExecutor``) |
| 75 | from workflow.graph import GraphExecutor |
| 76 | |
| 77 | executor = GraphExecutor.execute_graph(subgraph, task_prompt=task_payload) |
| 78 | result_messages = executor.get_final_output_messages() |
| 79 | |
| 80 | final_results = [] |
| 81 | if not result_messages: |
| 82 | # Fallback for no output |
| 83 | fallback = self._build_message( |
| 84 | MessageRole.ASSISTANT, |
| 85 | "", |
| 86 | source=node.id, |
| 87 | ) |
| 88 | final_results.append(fallback) |
nothing calls this directly
no test coverage detected