Handle an error for a chain run.
(
self,
error: BaseException,
*,
inputs: Optional[Dict[str, Any]] = None,
run_id: UUID,
**kwargs: Any,
)
| 218 | return chain_run |
| 219 | |
| 220 | def on_chain_error( |
| 221 | self, |
| 222 | error: BaseException, |
| 223 | *, |
| 224 | inputs: Optional[Dict[str, Any]] = None, |
| 225 | run_id: UUID, |
| 226 | **kwargs: Any, |
| 227 | ) -> Run: |
| 228 | """Handle an error for a chain run.""" |
| 229 | chain_run = self._errored_chain_run( |
| 230 | error=error, |
| 231 | run_id=run_id, |
| 232 | inputs=inputs, |
| 233 | **kwargs, |
| 234 | ) |
| 235 | self._end_trace(chain_run) |
| 236 | self._on_chain_error(chain_run) |
| 237 | return chain_run |
| 238 | |
| 239 | def on_tool_start( |
| 240 | self, |
no test coverage detected