Execute a single topological layer.
(self, layer_nodes: List[str])
| 44 | self._execute_layer(layer_nodes) |
| 45 | |
| 46 | def _execute_layer(self, layer_nodes: List[str]) -> None: |
| 47 | """Execute a single topological layer.""" |
| 48 | def execute_if_triggered(node_id: str) -> None: |
| 49 | node = self.nodes[node_id] |
| 50 | if node.is_triggered(): |
| 51 | self.execute_node_func(node) |
| 52 | else: |
| 53 | self.log_manager.debug(f"Node {node_id} skipped - not triggered") |
| 54 | |
| 55 | self.parallel_executor.execute_nodes_parallel(layer_nodes, execute_if_triggered) |
no test coverage detected