MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / _execute_node

Method _execute_node

workflow/graph.py:547–655  ·  view source on GitHub ↗

Execute a single node.

(self, node: Node)

Source from the content-addressed store, hash-verified

545 )
546
547 def _execute_node(self, node: Node) -> None:
548 """Execute a single node."""
549 self._raise_if_cancelled()
550 with self.resource_manager.guard_node(node):
551 input_results = node.input
552
553 # Clear incoming triggers so future iterations wait for fresh signals
554 node.reset_triggers()
555
556 serialized_inputs = [message.to_dict(include_data=False) for message in input_results]
557
558 # Record node start
559 self.log_manager.record_node_start(node.id, serialized_inputs, node.node_type, {
560 "input_count": len(input_results),
561 "predecessors": [p.id for p in node.predecessors],
562 "successors": [s.id for s in node.successors]
563 })
564
565 self.log_manager.debug(f"Processing {len(input_results)} inputs together for node {node.id}")
566
567 # Check if any incoming edge has dynamic configuration
568 dynamic_config = self._get_dynamic_config_for_node(node)
569
570 # Process all inputs together in a single executor call
571 with self.log_manager.node_timer(node.id):
572 if dynamic_config is not None:
573 raw_outputs = self._execute_with_dynamic_config(node, input_results, dynamic_config)
574 else:
575 raw_outputs = self._process_result(node, input_results)
576
577 # Process all output messages
578 output_messages: List[Message] = []
579 for raw_output in raw_outputs:
580 msg = self._ensure_source_output(raw_output, node.id)
581 node.append_output(msg)
582 output_messages.append(msg)
583
584 # Use first output for context trace handling (backward compat)
585 unified_output = output_messages[0] if output_messages else None
586
587 context_trace_payload = None
588 context_restored = False
589 if unified_output is not None and isinstance(unified_output.metadata, dict):
590 context_trace_payload = unified_output.metadata.get("context_trace")
591 if node.context_window != 0 and context_trace_payload:
592 context_restored = self._restore_context_trace(node, context_trace_payload)
593
594 if node.context_window != -1:
595 preserved_inputs = node.clear_input(preserve_kept=True, context_window=node.context_window)
596 if preserved_inputs:
597 self.log_manager.debug(
598 f"Node {node.id} cleaned up its input context after execution (preserved {preserved_inputs} keep-marked inputs)"
599 )
600 else:
601 self.log_manager.debug(
602 f"Node {node.id} cleaned up its input context after execution"
603 )
604

Callers

nothing calls this directly

Calls 15

_raise_if_cancelledMethod · 0.95
_process_resultMethod · 0.95
_ensure_source_outputMethod · 0.95
_process_edge_outputMethod · 0.95
EdgeLinkClass · 0.90
guard_nodeMethod · 0.80

Tested by

no test coverage detected