(self, message: Dict[str, Any])
| 118 | asyncio.run(self.send_message(session_id, message)) |
| 119 | |
| 120 | async def broadcast(self, message: Dict[str, Any]) -> None: |
| 121 | for session_id in list(self.active_connections.keys()): |
| 122 | await self.send_message(session_id, message) |
| 123 | |
| 124 | async def handle_heartbeat(self, session_id: str) -> None: |
| 125 | if session_id in self.active_connections: |
nothing calls this directly
no test coverage detected