Execute a node with dynamic configuration from incoming edges. Args: node: Target node to execute inputs: All input messages collected for this node dynamic_config: Dynamic configuration from the incoming edge Returns:
(
self,
node: Node,
inputs: List[Message],
dynamic_config,
)
| 501 | return first_config |
| 502 | |
| 503 | def _execute_with_dynamic_config( |
| 504 | self, |
| 505 | node: Node, |
| 506 | inputs: List[Message], |
| 507 | dynamic_config, |
| 508 | ) -> List[Message]: |
| 509 | """Execute a node with dynamic configuration from incoming edges. |
| 510 | |
| 511 | Args: |
| 512 | node: Target node to execute |
| 513 | inputs: All input messages collected for this node |
| 514 | dynamic_config: Dynamic configuration from the incoming edge |
| 515 | |
| 516 | Returns: |
| 517 | Output messages from dynamic execution |
| 518 | """ |
| 519 | # Separate inputs: dynamic edge inputs vs static (non-dynamic) edge inputs |
| 520 | # Dynamic edge inputs will be split, static inputs will be replicated to all units |
| 521 | dynamic_inputs: List[Message] = [] |
| 522 | static_inputs: List[Message] = [] |
| 523 | |
| 524 | for msg in inputs: |
| 525 | if msg.metadata.get("_from_dynamic_edge"): |
| 526 | dynamic_inputs.append(msg) |
| 527 | else: |
| 528 | static_inputs.append(msg) |
| 529 | |
| 530 | self.log_manager.info( |
| 531 | f"Executing node {node.id} with edge dynamic config ({dynamic_config.type} mode): " |
| 532 | f"{len(dynamic_inputs)} dynamic inputs, {len(static_inputs)} static inputs" |
| 533 | ) |
| 534 | |
| 535 | # Create node executor function |
| 536 | def node_executor_func(n: Node, inp: List[Message]) -> List[Message]: |
| 537 | return self._process_result(n, inp) |
| 538 | |
| 539 | # Execute with dynamic edge executor |
| 540 | dynamic_executor = DynamicEdgeExecutor(self.log_manager, node_executor_func) |
| 541 | |
| 542 | # Pass dynamic inputs for splitting, static inputs for replication |
| 543 | return dynamic_executor.execute_from_inputs( |
| 544 | node, dynamic_inputs, dynamic_config, static_inputs=static_inputs |
| 545 | ) |
| 546 | |
| 547 | def _execute_node(self, node: Node) -> None: |
| 548 | """Execute a single node.""" |
no test coverage detected