(
self,
input: Input,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
)
| 3157 | return output |
| 3158 | |
| 3159 | async def ainvoke( |
| 3160 | self, |
| 3161 | input: Input, |
| 3162 | config: Optional[RunnableConfig] = None, |
| 3163 | **kwargs: Optional[Any], |
| 3164 | ) -> Dict[str, Any]: |
| 3165 | # setup callbacks |
| 3166 | config = ensure_config(config) |
| 3167 | callback_manager = get_async_callback_manager_for_config(config) |
| 3168 | # start the root run |
| 3169 | run_manager = await callback_manager.on_chain_start( |
| 3170 | dumpd(self), |
| 3171 | input, |
| 3172 | name=config.get("run_name") or self.get_name(), |
| 3173 | run_id=config.pop("run_id", None), |
| 3174 | ) |
| 3175 | |
| 3176 | # gather results from all steps |
| 3177 | try: |
| 3178 | # copy to avoid issues from the caller mutating the steps during invoke() |
| 3179 | steps = dict(self.steps__) |
| 3180 | results = await asyncio.gather( |
| 3181 | *( |
| 3182 | step.ainvoke( |
| 3183 | input, |
| 3184 | # mark each step as a child run |
| 3185 | patch_config( |
| 3186 | config, callbacks=run_manager.get_child(f"map:key:{key}") |
| 3187 | ), |
| 3188 | ) |
| 3189 | for key, step in steps.items() |
| 3190 | ) |
| 3191 | ) |
| 3192 | output = {key: value for key, value in zip(steps, results)} |
| 3193 | # finish the root run |
| 3194 | except BaseException as e: |
| 3195 | await run_manager.on_chain_error(e) |
| 3196 | raise |
| 3197 | else: |
| 3198 | await run_manager.on_chain_end(output) |
| 3199 | return output |
| 3200 | |
| 3201 | def _transform( |
| 3202 | self, |
nothing calls this directly
no test coverage detected