Execute the graph based on topological layers structure or cycle-aware execution.
(self, task_prompt: Any)
| 258 | memory.save() |
| 259 | |
| 260 | def run(self, task_prompt: Any) -> Dict[str, Any]: |
| 261 | """Execute the graph based on topological layers structure or cycle-aware execution.""" |
| 262 | self._raise_if_cancelled() |
| 263 | graph_manager = GraphManager(self.graph) |
| 264 | try: |
| 265 | graph_manager.build_graph() |
| 266 | except ConfigError as err: |
| 267 | error_msg = f"Graph configuration error: {str(err)}" |
| 268 | self.log_manager.logger.error(error_msg) |
| 269 | raise err |
| 270 | |
| 271 | self._prepare_edge_conditions() |
| 272 | |
| 273 | if not self.graph.layers: |
| 274 | raise ExecutionError("Graph not built. Call GraphManager.build_graph() first.") |
| 275 | |
| 276 | # Record workflow start |
| 277 | self.log_manager.record_workflow_start(self.graph.metadata) |
| 278 | |
| 279 | # Initialize memory and thinking before execution |
| 280 | self._build_memories_and_thinking() |
| 281 | |
| 282 | # Initialize cycle manager if graph has cycles |
| 283 | if self.graph.has_cycles: |
| 284 | self.cycle_manager = graph_manager.get_cycle_manager() |
| 285 | |
| 286 | self.initial_task_messages = [msg.clone() for msg in self._normalize_task_input(task_prompt)] |
| 287 | |
| 288 | start_node_ids = set(self.graph.start_nodes) |
| 289 | |
| 290 | # Reset all trigger states and initialize configured start nodes |
| 291 | for node_id, node in self.graph.nodes.items(): |
| 292 | self._raise_if_cancelled() |
| 293 | node.reset_triggers() |
| 294 | if node_id in start_node_ids: |
| 295 | node.start_triggered = True |
| 296 | node.clear_input() |
| 297 | for message in self.initial_task_messages: |
| 298 | node.append_input(message.clone()) |
| 299 | |
| 300 | # Execute based on graph type (using strategy objects) |
| 301 | if self.graph.is_majority_voting: |
| 302 | strategy = MajorityVoteStrategy( |
| 303 | log_manager=self.log_manager, |
| 304 | nodes=self.graph.nodes, |
| 305 | initial_messages=self.initial_task_messages, |
| 306 | execute_node_func=self._execute_node, |
| 307 | payload_to_text_func=self._payload_to_text, |
| 308 | ) |
| 309 | self.majority_result = strategy.run() |
| 310 | elif self.graph.has_cycles: |
| 311 | strategy = CycleExecutionStrategy( |
| 312 | log_manager=self.log_manager, |
| 313 | nodes=self.graph.nodes, |
| 314 | cycle_execution_order=self.graph.cycle_execution_order, |
| 315 | cycle_manager=self.cycle_manager, |
| 316 | execute_node_func=self._execute_node, |
| 317 | ) |
no test coverage detected