Execute a single node.
(self, node: Node)
| 545 | ) |
| 546 | |
| 547 | def _execute_node(self, node: Node) -> None: |
| 548 | """Execute a single node.""" |
| 549 | self._raise_if_cancelled() |
| 550 | with self.resource_manager.guard_node(node): |
| 551 | input_results = node.input |
| 552 | |
| 553 | # Clear incoming triggers so future iterations wait for fresh signals |
| 554 | node.reset_triggers() |
| 555 | |
| 556 | serialized_inputs = [message.to_dict(include_data=False) for message in input_results] |
| 557 | |
| 558 | # Record node start |
| 559 | self.log_manager.record_node_start(node.id, serialized_inputs, node.node_type, { |
| 560 | "input_count": len(input_results), |
| 561 | "predecessors": [p.id for p in node.predecessors], |
| 562 | "successors": [s.id for s in node.successors] |
| 563 | }) |
| 564 | |
| 565 | self.log_manager.debug(f"Processing {len(input_results)} inputs together for node {node.id}") |
| 566 | |
| 567 | # Check if any incoming edge has dynamic configuration |
| 568 | dynamic_config = self._get_dynamic_config_for_node(node) |
| 569 | |
| 570 | # Process all inputs together in a single executor call |
| 571 | with self.log_manager.node_timer(node.id): |
| 572 | if dynamic_config is not None: |
| 573 | raw_outputs = self._execute_with_dynamic_config(node, input_results, dynamic_config) |
| 574 | else: |
| 575 | raw_outputs = self._process_result(node, input_results) |
| 576 | |
| 577 | # Process all output messages |
| 578 | output_messages: List[Message] = [] |
| 579 | for raw_output in raw_outputs: |
| 580 | msg = self._ensure_source_output(raw_output, node.id) |
| 581 | node.append_output(msg) |
| 582 | output_messages.append(msg) |
| 583 | |
| 584 | # Use first output for context trace handling (backward compat) |
| 585 | unified_output = output_messages[0] if output_messages else None |
| 586 | |
| 587 | context_trace_payload = None |
| 588 | context_restored = False |
| 589 | if unified_output is not None and isinstance(unified_output.metadata, dict): |
| 590 | context_trace_payload = unified_output.metadata.get("context_trace") |
| 591 | if node.context_window != 0 and context_trace_payload: |
| 592 | context_restored = self._restore_context_trace(node, context_trace_payload) |
| 593 | |
| 594 | if node.context_window != -1: |
| 595 | preserved_inputs = node.clear_input(preserve_kept=True, context_window=node.context_window) |
| 596 | if preserved_inputs: |
| 597 | self.log_manager.debug( |
| 598 | f"Node {node.id} cleaned up its input context after execution (preserved {preserved_inputs} keep-marked inputs)" |
| 599 | ) |
| 600 | else: |
| 601 | self.log_manager.debug( |
| 602 | f"Node {node.id} cleaned up its input context after execution" |
| 603 | ) |
| 604 |
nothing calls this directly
no test coverage detected