MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / __init__

Method __init__

workflow/graph.py:56–102  ·  view source on GitHub ↗

Initialize executor with graph context instance.

(
        self,
        graph: GraphContext,
        *,
        session_id: Optional[str] = None,
        workspace_hook_factory: Optional[Callable[[RuntimeContext], Any]] = None,
        cancel_event: Optional[threading.Event] = None,
    )

Source from the content-addressed store, hash-verified

54 """Executes ChatDev_new graph workflows with integrated memory and thinking management."""
55
56 def __init__(
57 self,
58 graph: GraphContext,
59 *,
60 session_id: Optional[str] = None,
61 workspace_hook_factory: Optional[Callable[[RuntimeContext], Any]] = None,
62 cancel_event: Optional[threading.Event] = None,
63 ) -> None:
64 """Initialize executor with graph context instance."""
65 self.majority_result = None
66 self.graph: GraphContext = graph
67 self.outputs = {}
68 self.logger = self._create_logger()
69 self._cancel_event = cancel_event or threading.Event()
70 self._cancel_reason: Optional[str] = None
71 runtime = RuntimeBuilder(graph).build(logger=self.logger, session_id=session_id)
72 if workspace_hook_factory:
73 runtime.workspace_hook = workspace_hook_factory(runtime)
74 self.runtime_context = runtime
75 self.tool_manager = runtime.tool_manager
76 self.function_manager = runtime.function_manager
77 self.edge_processor_function_manager = runtime.edge_processor_function_manager
78 self.log_manager = runtime.log_manager
79 self.resource_manager = ResourceManager(self.log_manager)
80
81 # Memory and Thinking management (moved from Graph)
82 self.thinking_managers: Dict[str, ThinkingManagerBase] = {}
83 self.global_memories: Dict[str, MemoryBase] = {}
84 self.agent_memory_managers: Dict[str, MemoryManager] = {}
85
86 # Token tracking
87 self.token_tracker = runtime.token_tracker
88
89 # Workspace roots
90 self.code_workspace = runtime.code_workspace
91 self.attachment_store = runtime.attachment_store
92
93 # Cycle management
94 self.cycle_manager: Optional[CycleManager] = None
95
96 # Node executors (new strategy pattern implementation)
97 self.__execution_context: Optional[ExecutionContext] = None
98 self.node_executors: Dict[str, Any] = {}
99 self._human_prompt_service: Optional[HumanPromptService] = None
100
101 # for majority voting mode
102 self.initial_task_messages: List[Message] = []
103
104 def request_cancel(self, reason: Optional[str] = None) -> None:
105 """Signal the executor to stop as soon as possible."""

Callers

nothing calls this directly

Calls 4

_create_loggerMethod · 0.95
RuntimeBuilderClass · 0.90
ResourceManagerClass · 0.90
buildMethod · 0.45

Tested by

no test coverage detected