Run the tool.
(
self,
tool_input: Union[str, Dict[str, Any]],
verbose: Optional[bool] = None,
start_color: Optional[str] = "green",
color: Optional[str] = "green",
callbacks: Callbacks = None,
*,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
run_name: Optional[str] = None,
run_id: Optional[uuid.UUID] = None,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
)
| 353 | return (), tool_input |
| 354 | |
| 355 | def run( |
| 356 | self, |
| 357 | tool_input: Union[str, Dict[str, Any]], |
| 358 | verbose: Optional[bool] = None, |
| 359 | start_color: Optional[str] = "green", |
| 360 | color: Optional[str] = "green", |
| 361 | callbacks: Callbacks = None, |
| 362 | *, |
| 363 | tags: Optional[List[str]] = None, |
| 364 | metadata: Optional[Dict[str, Any]] = None, |
| 365 | run_name: Optional[str] = None, |
| 366 | run_id: Optional[uuid.UUID] = None, |
| 367 | config: Optional[RunnableConfig] = None, |
| 368 | **kwargs: Any, |
| 369 | ) -> Any: |
| 370 | """Run the tool.""" |
| 371 | if not self.verbose and verbose is not None: |
| 372 | verbose_ = verbose |
| 373 | else: |
| 374 | verbose_ = self.verbose |
| 375 | callback_manager = CallbackManager.configure( |
| 376 | callbacks, |
| 377 | self.callbacks, |
| 378 | verbose_, |
| 379 | tags, |
| 380 | self.tags, |
| 381 | metadata, |
| 382 | self.metadata, |
| 383 | ) |
| 384 | # TODO: maybe also pass through run_manager is _run supports kwargs |
| 385 | new_arg_supported = signature(self._run).parameters.get("run_manager") |
| 386 | run_manager = callback_manager.on_tool_start( |
| 387 | {"name": self.name, "description": self.description}, |
| 388 | tool_input if isinstance(tool_input, str) else str(tool_input), |
| 389 | color=start_color, |
| 390 | name=run_name, |
| 391 | run_id=run_id, |
| 392 | # Inputs by definition should always be dicts. |
| 393 | # For now, it's unclear whether this assumption is ever violated, |
| 394 | # but if it is we will send a `None` value to the callback instead |
| 395 | # And will need to address issue via a patch. |
| 396 | inputs=None if isinstance(tool_input, str) else tool_input, |
| 397 | **kwargs, |
| 398 | ) |
| 399 | try: |
| 400 | child_config = patch_config( |
| 401 | config, |
| 402 | callbacks=run_manager.get_child(), |
| 403 | ) |
| 404 | context = copy_context() |
| 405 | context.run(_set_config_context, child_config) |
| 406 | parsed_input = self._parse_input(tool_input) |
| 407 | tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input) |
| 408 | observation = ( |
| 409 | context.run( |
| 410 | self._run, *tool_args, run_manager=run_manager, **tool_kwargs |
| 411 | ) |
| 412 | if new_arg_supported |