MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / DynamicEdgeExecutor

Class DynamicEdgeExecutor

workflow/executor/dynamic_edge_executor.py:18–399  ·  view source on GitHub ↗

Execute edge-level dynamic expansion. When an edge has dynamic configuration, this executor: 1. Splits the payload passing through the edge 2. Executes the target node for each split unit 3. Collects and returns results (flat for Map, reduced for Tree)

Source from the content-addressed store, hash-verified

16
17
18class DynamicEdgeExecutor:
19 """Execute edge-level dynamic expansion.
20
21 When an edge has dynamic configuration, this executor:
22 1. Splits the payload passing through the edge
23 2. Executes the target node for each split unit
24 3. Collects and returns results (flat for Map, reduced for Tree)
25 """
26
27 def __init__(
28 self,
29 log_manager: LogManager,
30 node_executor_func: Callable[[Node, List[Message]], List[Message]],
31 ):
32 """Initialize the dynamic edge executor.
33
34 Args:
35 log_manager: Logger instance
36 node_executor_func: Function to execute a node with inputs
37 """
38 self.log_manager = log_manager
39 self.node_executor_func = node_executor_func
40
41 def execute(
42 self,
43 target_node: Node,
44 payload: Message,
45 dynamic_config: DynamicEdgeConfig,
46 static_inputs: Optional[List[Message]] = None,
47 ) -> List[Message]:
48 """Execute dynamic expansion for an edge.
49
50 Args:
51 target_node: The node to execute (will be used as template)
52 payload: The message passing through the edge
53 dynamic_config: Edge dynamic configuration
54 static_inputs: Optional static inputs from non-dynamic edges
55
56 Returns:
57 List of output messages from all executions
58 """
59 split_config = dynamic_config.split
60
61 # Create splitter based on config
62 splitter = create_splitter_from_config(split_config)
63
64 # Split the payload into execution units
65 execution_units = splitter.split([payload])
66
67 if not execution_units:
68 self.log_manager.debug(
69 f"Dynamic edge -> {target_node.id}: no execution units after split"
70 )
71 return []
72
73 self.log_manager.info(
74 f"Dynamic edge -> {target_node.id}: splitting into {len(execution_units)} parallel units"
75 )

Callers 1

Calls

no outgoing calls

Tested by

no test coverage detected