Default implementation runs invoke in parallel using a thread pool executor. The default implementation of batch works well for IO bound runnables. Subclasses should override this method if they can batch more efficiently; e.g., if the underlying runnable uses an API which
(
self,
inputs: List[Input],
config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None,
*,
return_exceptions: bool = False,
**kwargs: Optional[Any],
)
| 606 | return await run_in_executor(config, self.invoke, input, config, **kwargs) |
| 607 | |
| 608 | def batch( |
| 609 | self, |
| 610 | inputs: List[Input], |
| 611 | config: Optional[Union[RunnableConfig, List[RunnableConfig]]] = None, |
| 612 | *, |
| 613 | return_exceptions: bool = False, |
| 614 | **kwargs: Optional[Any], |
| 615 | ) -> List[Output]: |
| 616 | """Default implementation runs invoke in parallel using a thread pool executor. |
| 617 | |
| 618 | The default implementation of batch works well for IO bound runnables. |
| 619 | |
| 620 | Subclasses should override this method if they can batch more efficiently; |
| 621 | e.g., if the underlying runnable uses an API which supports a batch mode. |
| 622 | """ |
| 623 | if not inputs: |
| 624 | return [] |
| 625 | |
| 626 | configs = get_config_list(config, len(inputs)) |
| 627 | |
| 628 | def invoke(input: Input, config: RunnableConfig) -> Union[Output, Exception]: |
| 629 | if return_exceptions: |
| 630 | try: |
| 631 | return self.invoke(input, config, **kwargs) |
| 632 | except Exception as e: |
| 633 | return e |
| 634 | else: |
| 635 | return self.invoke(input, config, **kwargs) |
| 636 | |
| 637 | # If there's only one input, don't bother with the executor |
| 638 | if len(inputs) == 1: |
| 639 | return cast(List[Output], [invoke(inputs[0], configs[0])]) |
| 640 | |
| 641 | with get_executor_for_config(configs[0]) as executor: |
| 642 | return cast(List[Output], list(executor.map(invoke, inputs, configs))) |
| 643 | |
| 644 | @overload |
| 645 | def batch_as_completed( |