Execute edge-level dynamic expansion. When an edge has dynamic configuration, this executor: 1. Splits the payload passing through the edge 2. Executes the target node for each split unit 3. Collects and returns results (flat for Map, reduced for Tree)
| 16 | |
| 17 | |
| 18 | class DynamicEdgeExecutor: |
| 19 | """Execute edge-level dynamic expansion. |
| 20 | |
| 21 | When an edge has dynamic configuration, this executor: |
| 22 | 1. Splits the payload passing through the edge |
| 23 | 2. Executes the target node for each split unit |
| 24 | 3. Collects and returns results (flat for Map, reduced for Tree) |
| 25 | """ |
| 26 | |
| 27 | def __init__( |
| 28 | self, |
| 29 | log_manager: LogManager, |
| 30 | node_executor_func: Callable[[Node, List[Message]], List[Message]], |
| 31 | ): |
| 32 | """Initialize the dynamic edge executor. |
| 33 | |
| 34 | Args: |
| 35 | log_manager: Logger instance |
| 36 | node_executor_func: Function to execute a node with inputs |
| 37 | """ |
| 38 | self.log_manager = log_manager |
| 39 | self.node_executor_func = node_executor_func |
| 40 | |
| 41 | def execute( |
| 42 | self, |
| 43 | target_node: Node, |
| 44 | payload: Message, |
| 45 | dynamic_config: DynamicEdgeConfig, |
| 46 | static_inputs: Optional[List[Message]] = None, |
| 47 | ) -> List[Message]: |
| 48 | """Execute dynamic expansion for an edge. |
| 49 | |
| 50 | Args: |
| 51 | target_node: The node to execute (will be used as template) |
| 52 | payload: The message passing through the edge |
| 53 | dynamic_config: Edge dynamic configuration |
| 54 | static_inputs: Optional static inputs from non-dynamic edges |
| 55 | |
| 56 | Returns: |
| 57 | List of output messages from all executions |
| 58 | """ |
| 59 | split_config = dynamic_config.split |
| 60 | |
| 61 | # Create splitter based on config |
| 62 | splitter = create_splitter_from_config(split_config) |
| 63 | |
| 64 | # Split the payload into execution units |
| 65 | execution_units = splitter.split([payload]) |
| 66 | |
| 67 | if not execution_units: |
| 68 | self.log_manager.debug( |
| 69 | f"Dynamic edge -> {target_node.id}: no execution units after split" |
| 70 | ) |
| 71 | return [] |
| 72 | |
| 73 | self.log_manager.info( |
| 74 | f"Dynamic edge -> {target_node.id}: splitting into {len(execution_units)} parallel units" |
| 75 | ) |
no outgoing calls
no test coverage detected