(self, session_id: str, message: Dict[str, Any])
| 108 | # self.disconnect(session_id) |
| 109 | |
| 110 | def send_message_sync(self, session_id: str, message: Dict[str, Any]) -> None: |
| 111 | try: |
| 112 | loop = asyncio.get_running_loop() |
| 113 | if loop.is_running(): |
| 114 | asyncio.create_task(self.send_message(session_id, message)) |
| 115 | else: |
| 116 | asyncio.run(self.send_message(session_id, message)) |
| 117 | except RuntimeError: |
| 118 | asyncio.run(self.send_message(session_id, message)) |
| 119 | |
| 120 | async def broadcast(self, message: Dict[str, Any]) -> None: |
| 121 | for session_id in list(self.active_connections.keys()): |
no test coverage detected