MCPcopy
hub / github.com/langchain-ai/langchain / abatch

Method abatch

libs/core/langchain_core/runnables/base.py:2678–2805  ·  view source on GitHub ↗
(
        self,
        inputs: List[Input],
        config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
        *,
        return_exceptions: bool = False,
        **kwargs: Optional[Any],
    )

Source from the content-addressed store, hash-verified

2676 raise first_exception
2677
2678 async def abatch(
2679 self,
2680 inputs: List[Input],
2681 config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
2682 *,
2683 return_exceptions: bool = False,
2684 **kwargs: Optional[Any],
2685 ) -> List[Output]:
2686 from langchain_core.beta.runnables.context import aconfig_with_context
2687 from langchain_core.callbacks.manager import AsyncCallbackManager
2688
2689 if not inputs:
2690 return []
2691
2692 # setup callbacks and context
2693 configs = [
2694 aconfig_with_context(c, self.steps)
2695 for c in get_config_list(config, len(inputs))
2696 ]
2697 callback_managers = [
2698 AsyncCallbackManager.configure(
2699 inheritable_callbacks=config.get("callbacks"),
2700 local_callbacks=None,
2701 verbose=False,
2702 inheritable_tags=config.get("tags"),
2703 local_tags=None,
2704 inheritable_metadata=config.get("metadata"),
2705 local_metadata=None,
2706 )
2707 for config in configs
2708 ]
2709 # start the root runs, one per input
2710 run_managers: List[AsyncCallbackManagerForChainRun] = await asyncio.gather(
2711 *(
2712 cm.on_chain_start(
2713 dumpd(self),
2714 input,
2715 name=config.get("run_name") or self.get_name(),
2716 run_id=config.pop("run_id", None),
2717 )
2718 for cm, input, config in zip(callback_managers, inputs, configs)
2719 )
2720 )
2721
2722 # invoke .batch() on each step
2723 # this uses batching optimizations in Runnable subclasses, like LLM
2724 try:
2725 if return_exceptions:
2726 # Track which inputs (by index) failed so far
2727 # If an input has failed it will be present in this map,
2728 # and the value will be the exception that was raised.
2729 failed_inputs_map: Dict[int, Exception] = {}
2730 for stepidx, step in enumerate(self.steps):
2731 # Assemble the original indexes of the remaining inputs
2732 # (i.e. the ones that haven't failed yet)
2733 remaining_idxs = [
2734 i for i in range(len(configs)) if i not in failed_inputs_map
2735 ]

Callers

nothing calls this directly

Calls 15

aconfig_with_contextFunction · 0.90
get_config_listFunction · 0.90
dumpdFunction · 0.90
patch_configFunction · 0.90
popMethod · 0.80
appendMethod · 0.80
configureMethod · 0.45
getMethod · 0.45
on_chain_startMethod · 0.45
get_nameMethod · 0.45
abatchMethod · 0.45
get_childMethod · 0.45

Tested by

no test coverage detected