Execute an agent node. Args: node: Agent node definition inputs: Input messages collected from upstream nodes Returns: Agent output messages
(self, node: Node, inputs: List[Message])
| 41 | """Executor that runs agent nodes.""" |
| 42 | |
| 43 | def execute(self, node: Node, inputs: List[Message]) -> List[Message]: |
| 44 | """Execute an agent node. |
| 45 | |
| 46 | Args: |
| 47 | node: Agent node definition |
| 48 | inputs: Input messages collected from upstream nodes |
| 49 | |
| 50 | Returns: |
| 51 | Agent output messages |
| 52 | """ |
| 53 | self._ensure_not_cancelled() |
| 54 | if node.node_type != "agent": |
| 55 | raise ValueError(f"Node {node.id} is not an agent node") |
| 56 | |
| 57 | agent_config = node.as_config(AgentConfig) |
| 58 | if not agent_config: |
| 59 | raise ValueError(f"Node {node.id} missing agent config") |
| 60 | |
| 61 | try: |
| 62 | self._current_node_id = node.id |
| 63 | provider_class = ProviderRegistry.get_provider(agent_config.provider) |
| 64 | if not provider_class: |
| 65 | raise ValueError(f"Provider '{agent_config.provider}' not found") |
| 66 | |
| 67 | agent_config.token_tracker = self.context.get_token_tracker() |
| 68 | agent_config.node_id = node.id |
| 69 | |
| 70 | input_data = self._inputs_to_text(inputs) |
| 71 | input_payload = self._build_thinking_payload_from_inputs(inputs, input_data) |
| 72 | memory_query_snapshot = self._build_memory_query_snapshot(inputs, input_data) |
| 73 | input_mode = agent_config.input_mode or AgentInputMode.PROMPT |
| 74 | external_tool_specs = self.tool_manager.get_tool_specs(agent_config.tooling) |
| 75 | skill_manager = self._build_skill_manager(node, agent_config, external_tool_specs) |
| 76 | |
| 77 | provider = provider_class(agent_config) |
| 78 | client = provider.create_client() |
| 79 | |
| 80 | if input_mode is AgentInputMode.PROMPT: |
| 81 | conversation = self._prepare_prompt_messages(node, input_data, skill_manager) |
| 82 | else: |
| 83 | conversation = self._prepare_message_conversation(node, inputs, skill_manager) |
| 84 | call_options = self._prepare_call_options(node) |
| 85 | tool_specs = self._merge_skill_tool_specs(external_tool_specs, skill_manager) |
| 86 | |
| 87 | agent_invoker = self._build_agent_invoker( |
| 88 | provider, |
| 89 | client, |
| 90 | call_options, |
| 91 | tool_specs, |
| 92 | node, |
| 93 | ) |
| 94 | |
| 95 | if agent_config.thinking: |
| 96 | self._apply_pre_generation_thinking( |
| 97 | node, |
| 98 | conversation, |
| 99 | input_payload, |
| 100 | memory_query_snapshot, |
nothing calls this directly
no test coverage detected