Emit warnings for nodes that cannot be triggered by any predecessor.
(self)
| 319 | self.graph.explicit_start_nodes = explicit_ordered |
| 320 | |
| 321 | def _warn_on_untriggerable_nodes(self) -> None: |
| 322 | """Emit warnings for nodes that cannot be triggered by any predecessor.""" |
| 323 | start_nodes = set(self.graph.start_nodes or []) |
| 324 | for node_id, node in self.graph.nodes.items(): |
| 325 | if not node.predecessors: |
| 326 | continue |
| 327 | if node_id in start_nodes: |
| 328 | continue |
| 329 | |
| 330 | has_triggerable_edge = False |
| 331 | for predecessor in node.predecessors: |
| 332 | for edge_link in predecessor.iter_outgoing_edges(): |
| 333 | if edge_link.target is node and edge_link.trigger: |
| 334 | has_triggerable_edge = True |
| 335 | break |
| 336 | if has_triggerable_edge: |
| 337 | break |
| 338 | |
| 339 | if not has_triggerable_edge: |
| 340 | print( |
| 341 | f"Warning: node '{node_id}' has no triggerable incoming edges and will never execute." |
| 342 | ) |
| 343 | |
| 344 | def get_cycle_manager(self) -> CycleManager: |
| 345 | """Get the cycle manager instance.""" |
no test coverage detected