Executes ChatDev_new graph workflows with integrated memory and thinking management.
| 51 | |
| 52 | |
| 53 | class GraphExecutor: |
| 54 | """Executes ChatDev_new graph workflows with integrated memory and thinking management.""" |
| 55 | |
| 56 | def __init__( |
| 57 | self, |
| 58 | graph: GraphContext, |
| 59 | *, |
| 60 | session_id: Optional[str] = None, |
| 61 | workspace_hook_factory: Optional[Callable[[RuntimeContext], Any]] = None, |
| 62 | cancel_event: Optional[threading.Event] = None, |
| 63 | ) -> None: |
| 64 | """Initialize executor with graph context instance.""" |
| 65 | self.majority_result = None |
| 66 | self.graph: GraphContext = graph |
| 67 | self.outputs = {} |
| 68 | self.logger = self._create_logger() |
| 69 | self._cancel_event = cancel_event or threading.Event() |
| 70 | self._cancel_reason: Optional[str] = None |
| 71 | runtime = RuntimeBuilder(graph).build(logger=self.logger, session_id=session_id) |
| 72 | if workspace_hook_factory: |
| 73 | runtime.workspace_hook = workspace_hook_factory(runtime) |
| 74 | self.runtime_context = runtime |
| 75 | self.tool_manager = runtime.tool_manager |
| 76 | self.function_manager = runtime.function_manager |
| 77 | self.edge_processor_function_manager = runtime.edge_processor_function_manager |
| 78 | self.log_manager = runtime.log_manager |
| 79 | self.resource_manager = ResourceManager(self.log_manager) |
| 80 | |
| 81 | # Memory and Thinking management (moved from Graph) |
| 82 | self.thinking_managers: Dict[str, ThinkingManagerBase] = {} |
| 83 | self.global_memories: Dict[str, MemoryBase] = {} |
| 84 | self.agent_memory_managers: Dict[str, MemoryManager] = {} |
| 85 | |
| 86 | # Token tracking |
| 87 | self.token_tracker = runtime.token_tracker |
| 88 | |
| 89 | # Workspace roots |
| 90 | self.code_workspace = runtime.code_workspace |
| 91 | self.attachment_store = runtime.attachment_store |
| 92 | |
| 93 | # Cycle management |
| 94 | self.cycle_manager: Optional[CycleManager] = None |
| 95 | |
| 96 | # Node executors (new strategy pattern implementation) |
| 97 | self.__execution_context: Optional[ExecutionContext] = None |
| 98 | self.node_executors: Dict[str, Any] = {} |
| 99 | self._human_prompt_service: Optional[HumanPromptService] = None |
| 100 | |
| 101 | # for majority voting mode |
| 102 | self.initial_task_messages: List[Message] = [] |
| 103 | |
| 104 | def request_cancel(self, reason: Optional[str] = None) -> None: |
| 105 | """Signal the executor to stop as soon as possible.""" |
| 106 | if reason: |
| 107 | self._cancel_reason = reason |
| 108 | elif not self._cancel_reason: |
| 109 | self._cancel_reason = "Workflow execution cancelled" |
| 110 | self._cancel_event.set() |