Record an edge-processing event.
(self, from_node: str, to_node: str,
details: Dict[str, Any] = None)
| 193 | ) |
| 194 | |
| 195 | def record_edge_process(self, from_node: str, to_node: str, |
| 196 | details: Dict[str, Any] = None) -> None: |
| 197 | """Record an edge-processing event.""" |
| 198 | self.debug( |
| 199 | f"Processing edge from {from_node} to {to_node}", |
| 200 | node_id=from_node, |
| 201 | event_type=EventType.EDGE_PROCESS, |
| 202 | details={ |
| 203 | "to_node": to_node, |
| 204 | **(details or {}) |
| 205 | } |
| 206 | ) |
| 207 | |
| 208 | def record_human_interaction(self, node_id: str, input_data: str = None, output: str = None, |
| 209 | duration: float = None, details: Dict[str, Any] = None) -> None: |
no test coverage detected