Return the WebSocket manager if the session is connected or known.
(session_id: str, *, require_connection: bool = False)
| 22 | |
| 23 | |
| 24 | def ensure_known_session(session_id: str, *, require_connection: bool = False) -> WebSocketManager: |
| 25 | """Return the WebSocket manager if the session is connected or known.""" |
| 26 | |
| 27 | manager = get_websocket_manager() |
| 28 | if not session_id: |
| 29 | raise ValidationError("Session not connected", details={"session_id": session_id}) |
| 30 | |
| 31 | if session_id in manager.active_connections: |
| 32 | return manager |
| 33 | |
| 34 | if not require_connection and manager.session_store.has_session(session_id): |
| 35 | return manager |
| 36 | |
| 37 | raise ValidationError("Session not connected", details={"session_id": session_id}) |
no test coverage detected