MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / GraphExecutor

Class GraphExecutor

workflow/graph.py:53–860  ·  view source on GitHub ↗

Executes ChatDev_new graph workflows with integrated memory and thinking management.

Source from the content-addressed store, hash-verified

51
52
53class GraphExecutor:
54 """Executes ChatDev_new graph workflows with integrated memory and thinking management."""
55
56 def __init__(
57 self,
58 graph: GraphContext,
59 *,
60 session_id: Optional[str] = None,
61 workspace_hook_factory: Optional[Callable[[RuntimeContext], Any]] = None,
62 cancel_event: Optional[threading.Event] = None,
63 ) -> None:
64 """Initialize executor with graph context instance."""
65 self.majority_result = None
66 self.graph: GraphContext = graph
67 self.outputs = {}
68 self.logger = self._create_logger()
69 self._cancel_event = cancel_event or threading.Event()
70 self._cancel_reason: Optional[str] = None
71 runtime = RuntimeBuilder(graph).build(logger=self.logger, session_id=session_id)
72 if workspace_hook_factory:
73 runtime.workspace_hook = workspace_hook_factory(runtime)
74 self.runtime_context = runtime
75 self.tool_manager = runtime.tool_manager
76 self.function_manager = runtime.function_manager
77 self.edge_processor_function_manager = runtime.edge_processor_function_manager
78 self.log_manager = runtime.log_manager
79 self.resource_manager = ResourceManager(self.log_manager)
80
81 # Memory and Thinking management (moved from Graph)
82 self.thinking_managers: Dict[str, ThinkingManagerBase] = {}
83 self.global_memories: Dict[str, MemoryBase] = {}
84 self.agent_memory_managers: Dict[str, MemoryManager] = {}
85
86 # Token tracking
87 self.token_tracker = runtime.token_tracker
88
89 # Workspace roots
90 self.code_workspace = runtime.code_workspace
91 self.attachment_store = runtime.attachment_store
92
93 # Cycle management
94 self.cycle_manager: Optional[CycleManager] = None
95
96 # Node executors (new strategy pattern implementation)
97 self.__execution_context: Optional[ExecutionContext] = None
98 self.node_executors: Dict[str, Any] = {}
99 self._human_prompt_service: Optional[HumanPromptService] = None
100
101 # for majority voting mode
102 self.initial_task_messages: List[Message] = []
103
104 def request_cancel(self, reason: Optional[str] = None) -> None:
105 """Signal the executor to stop as soon as possible."""
106 if reason:
107 self._cancel_reason = reason
108 elif not self._cancel_reason:
109 self._cancel_reason = "Workflow execution cancelled"
110 self._cancel_event.set()

Callers 1

_run_single_taskMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected