MCPcopy Index your code
hub / github.com/OpenBMB/ChatDev / WebSocketManager

Class WebSocketManager

server/services/websocket_manager.py:41–147  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

39
40
41class WebSocketManager:
42 def __init__(
43 self,
44 *,
45 session_store: WorkflowSessionStore | None = None,
46 session_controller: SessionExecutionController | None = None,
47 attachment_service: AttachmentService | None = None,
48 workflow_run_service: WorkflowRunService | None = None,
49 ):
50 self.active_connections: Dict[str, WebSocket] = {}
51 self.connection_timestamps: Dict[str, float] = {}
52 self.session_store = session_store or WorkflowSessionStore()
53 self.session_controller = session_controller or SessionExecutionController(self.session_store)
54 self.attachment_service = attachment_service or AttachmentService()
55 self.workflow_run_service = workflow_run_service or WorkflowRunService(
56 self.session_store,
57 self.session_controller,
58 self.attachment_service,
59 )
60 self.message_handler = MessageHandler(
61 self.session_store,
62 self.session_controller,
63 self.workflow_run_service,
64 )
65
66 async def connect(self, websocket: WebSocket, session_id: Optional[str] = None) -> str:
67 await websocket.accept()
68 if not session_id:
69 session_id = str(uuid.uuid4())
70 self.active_connections[session_id] = websocket
71 self.connection_timestamps[session_id] = time.time()
72 logging.info("WebSocket connected: %s", session_id)
73 await self.send_message(
74 session_id,
75 {
76 "type": "connection",
77 "data": {"session_id": session_id, "status": "connected"},
78 },
79 )
80 return session_id
81
82 def disconnect(self, session_id: str) -> None:
83 session = self.session_store.get_session(session_id)
84 if session and session.status in {SessionStatus.RUNNING, SessionStatus.WAITING_FOR_INPUT}:
85 self.workflow_run_service.request_cancel(
86 session_id,
87 reason="WebSocket disconnected",
88 )
89 if session_id in self.active_connections:
90 del self.active_connections[session_id]
91 if session_id in self.connection_timestamps:
92 del self.connection_timestamps[session_id]
93 self.session_controller.cleanup_session(session_id)
94 remaining_session = self.session_store.get_session(session_id)
95 if remaining_session and remaining_session.executor is None:
96 self.session_store.pop_session(session_id)
97 self.attachment_service.cleanup_session(session_id)
98 logging.info("WebSocket disconnected: %s", session_id)

Callers 1

get_websocket_managerFunction · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected