(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
)
| 2512 | return cast(Output, input) |
| 2513 | |
| 2514 | async def ainvoke( |
| 2515 | self, |
| 2516 | input: Input, |
| 2517 | config: Optional[RunnableConfig] = None, |
| 2518 | **kwargs: Optional[Any], |
| 2519 | ) -> Output: |
| 2520 | from langchain_core.beta.runnables.context import aconfig_with_context |
| 2521 | |
| 2522 | # setup callbacks and context |
| 2523 | config = aconfig_with_context(ensure_config(config), self.steps) |
| 2524 | callback_manager = get_async_callback_manager_for_config(config) |
| 2525 | # start the root run |
| 2526 | run_manager = await callback_manager.on_chain_start( |
| 2527 | dumpd(self), |
| 2528 | input, |
| 2529 | name=config.get("run_name") or self.get_name(), |
| 2530 | run_id=config.pop("run_id", None), |
| 2531 | ) |
| 2532 | |
| 2533 | # invoke all steps in sequence |
| 2534 | try: |
| 2535 | for i, step in enumerate(self.steps): |
| 2536 | # mark each step as a child run |
| 2537 | config = patch_config( |
| 2538 | config, callbacks=run_manager.get_child(f"seq:step:{i+1}") |
| 2539 | ) |
| 2540 | if i == 0: |
| 2541 | input = await step.ainvoke(input, config, **kwargs) |
| 2542 | else: |
| 2543 | input = await step.ainvoke(input, config) |
| 2544 | # finish the root run |
| 2545 | except BaseException as e: |
| 2546 | await run_manager.on_chain_error(e) |
| 2547 | raise |
| 2548 | else: |
| 2549 | await run_manager.on_chain_end(input) |
| 2550 | return cast(Output, input) |
| 2551 | |
| 2552 | def batch( |
| 2553 | self, |
nothing calls this directly
no test coverage detected