(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
)
| 2676 | raise first_exception |
| 2677 | |
| 2678 | async def abatch( |
| 2679 | self, |
| 2680 | inputs: List[Input], |
| 2681 | config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, |
| 2682 | *, |
| 2683 | return_exceptions: bool = False, |
| 2684 | **kwargs: Optional[Any], |
| 2685 | ) -> List[Output]: |
| 2686 | from langchain_core.beta.runnables.context import aconfig_with_context |
| 2687 | from langchain_core.callbacks.manager import AsyncCallbackManager |
| 2688 | |
| 2689 | if not inputs: |
| 2690 | return [] |
| 2691 | |
| 2692 | # setup callbacks and context |
| 2693 | configs = [ |
| 2694 | aconfig_with_context(c, self.steps) |
| 2695 | for c in get_config_list(config, len(inputs)) |
| 2696 | ] |
| 2697 | callback_managers = [ |
| 2698 | AsyncCallbackManager.configure( |
| 2699 | inheritable_callbacks=config.get("callbacks"), |
| 2700 | local_callbacks=None, |
| 2701 | verbose=False, |
| 2702 | inheritable_tags=config.get("tags"), |
| 2703 | local_tags=None, |
| 2704 | inheritable_metadata=config.get("metadata"), |
| 2705 | local_metadata=None, |
| 2706 | ) |
| 2707 | for config in configs |
| 2708 | ] |
| 2709 | # start the root runs, one per input |
| 2710 | run_managers: List[AsyncCallbackManagerForChainRun] = await asyncio.gather( |
| 2711 | *( |
| 2712 | cm.on_chain_start( |
| 2713 | dumpd(self), |
| 2714 | input, |
| 2715 | name=config.get("run_name") or self.get_name(), |
| 2716 | run_id=config.pop("run_id", None), |
| 2717 | ) |
| 2718 | for cm, input, config in zip(callback_managers, inputs, configs) |
| 2719 | ) |
| 2720 | ) |
| 2721 | |
| 2722 | # invoke .batch() on each step |
| 2723 | # this uses batching optimizations in Runnable subclasses, like LLM |
| 2724 | try: |
| 2725 | if return_exceptions: |
| 2726 | # Track which inputs (by index) failed so far |
| 2727 | # If an input has failed it will be present in this map, |
| 2728 | # and the value will be the exception that was raised. |
| 2729 | failed_inputs_map: Dict[int, Exception] = {} |
| 2730 | for stepidx, step in enumerate(self.steps): |
| 2731 | # Assemble the original indexes of the remaining inputs |
| 2732 | # (i.e. the ones that haven't failed yet) |
| 2733 | remaining_idxs = [ |
| 2734 | i for i in range(len(configs)) if i not in failed_inputs_map |
| 2735 | ] |
nothing calls this directly
no test coverage detected