(
self, input: Input, config: Optional[RunnableConfig] = None
)
| 3107 | return "{\n " + map_for_repr + "\n}" |
| 3108 | |
| 3109 | def invoke( |
| 3110 | self, input: Input, config: Optional[RunnableConfig] = None |
| 3111 | ) -> Dict[str, Any]: |
| 3112 | from langchain_core.callbacks.manager import CallbackManager |
| 3113 | |
| 3114 | # setup callbacks |
| 3115 | config = ensure_config(config) |
| 3116 | callback_manager = CallbackManager.configure( |
| 3117 | inheritable_callbacks=config.get("callbacks"), |
| 3118 | local_callbacks=None, |
| 3119 | verbose=False, |
| 3120 | inheritable_tags=config.get("tags"), |
| 3121 | local_tags=None, |
| 3122 | inheritable_metadata=config.get("metadata"), |
| 3123 | local_metadata=None, |
| 3124 | ) |
| 3125 | # start the root run |
| 3126 | run_manager = callback_manager.on_chain_start( |
| 3127 | dumpd(self), |
| 3128 | input, |
| 3129 | name=config.get("run_name") or self.get_name(), |
| 3130 | run_id=config.pop("run_id", None), |
| 3131 | ) |
| 3132 | |
| 3133 | # gather results from all steps |
| 3134 | try: |
| 3135 | # copy to avoid issues from the caller mutating the steps during invoke() |
| 3136 | steps = dict(self.steps__) |
| 3137 | with get_executor_for_config(config) as executor: |
| 3138 | futures = [ |
| 3139 | executor.submit( |
| 3140 | step.invoke, |
| 3141 | input, |
| 3142 | # mark each step as a child run |
| 3143 | patch_config( |
| 3144 | config, |
| 3145 | callbacks=run_manager.get_child(f"map:key:{key}"), |
| 3146 | ), |
| 3147 | ) |
| 3148 | for key, step in steps.items() |
| 3149 | ] |
| 3150 | output = {key: future.result() for key, future in zip(steps, futures)} |
| 3151 | # finish the root run |
| 3152 | except BaseException as e: |
| 3153 | run_manager.on_chain_error(e) |
| 3154 | raise |
| 3155 | else: |
| 3156 | run_manager.on_chain_end(output) |
| 3157 | return output |
| 3158 | |
| 3159 | async def ainvoke( |
| 3160 | self, |
nothing calls this directly
no test coverage detected