Record the workflow end event.
(self, success: bool = True,
duration: float = None, details: Dict[str, Any] = None)
| 319 | ) |
| 320 | |
| 321 | def record_workflow_end(self, success: bool = True, |
| 322 | duration: float = None, details: Dict[str, Any] = None) -> None: |
| 323 | """Record the workflow end event.""" |
| 324 | end_details = { |
| 325 | "success": success, |
| 326 | "total_logs": len(self.logs), |
| 327 | **(details or {}) |
| 328 | } |
| 329 | |
| 330 | level = LogLevel.INFO if success else LogLevel.ERROR |
| 331 | self.add_log( |
| 332 | level, |
| 333 | "Workflow execution completed", |
| 334 | event_type=EventType.WORKFLOW_END, |
| 335 | details=end_details, |
| 336 | duration=duration |
| 337 | ) |
| 338 | |
| 339 | def get_logs(self) -> List[Dict[str, Any]]: |
| 340 | """Return all log entries as dictionaries.""" |