(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
)
| 2550 | return cast(Output, input) |
| 2551 | |
| 2552 | def batch( |
| 2553 | self, |
| 2554 | inputs: List[Input], |
| 2555 | config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, |
| 2556 | *, |
| 2557 | return_exceptions: bool = False, |
| 2558 | **kwargs: Optional[Any], |
| 2559 | ) -> List[Output]: |
| 2560 | from langchain_core.beta.runnables.context import config_with_context |
| 2561 | from langchain_core.callbacks.manager import CallbackManager |
| 2562 | |
| 2563 | if not inputs: |
| 2564 | return [] |
| 2565 | |
| 2566 | # setup callbacks and context |
| 2567 | configs = [ |
| 2568 | config_with_context(c, self.steps) |
| 2569 | for c in get_config_list(config, len(inputs)) |
| 2570 | ] |
| 2571 | callback_managers = [ |
| 2572 | CallbackManager.configure( |
| 2573 | inheritable_callbacks=config.get("callbacks"), |
| 2574 | local_callbacks=None, |
| 2575 | verbose=False, |
| 2576 | inheritable_tags=config.get("tags"), |
| 2577 | local_tags=None, |
| 2578 | inheritable_metadata=config.get("metadata"), |
| 2579 | local_metadata=None, |
| 2580 | ) |
| 2581 | for config in configs |
| 2582 | ] |
| 2583 | # start the root runs, one per input |
| 2584 | run_managers = [ |
| 2585 | cm.on_chain_start( |
| 2586 | dumpd(self), |
| 2587 | input, |
| 2588 | name=config.get("run_name") or self.get_name(), |
| 2589 | run_id=config.pop("run_id", None), |
| 2590 | ) |
| 2591 | for cm, input, config in zip(callback_managers, inputs, configs) |
| 2592 | ] |
| 2593 | |
| 2594 | # invoke |
| 2595 | try: |
| 2596 | if return_exceptions: |
| 2597 | # Track which inputs (by index) failed so far |
| 2598 | # If an input has failed it will be present in this map, |
| 2599 | # and the value will be the exception that was raised. |
| 2600 | failed_inputs_map: Dict[int, Exception] = {} |
| 2601 | for stepidx, step in enumerate(self.steps): |
| 2602 | # Assemble the original indexes of the remaining inputs |
| 2603 | # (i.e. the ones that haven't failed yet) |
| 2604 | remaining_idxs = [ |
| 2605 | i for i in range(len(configs)) if i not in failed_inputs_map |
| 2606 | ] |
| 2607 | # Invoke the step on the remaining inputs |
| 2608 | inputs = step.batch( |
| 2609 | [ |
nothing calls this directly
no test coverage detected