(self, event: events.CloseConnection)
| 243 | self.transport.pause_reading() |
| 244 | |
| 245 | def handle_close(self, event: events.CloseConnection) -> None: |
| 246 | if self.conn.state == ConnectionState.REMOTE_CLOSING: |
| 247 | self.transport.write(self.conn.send(event.response())) |
| 248 | self.queue.put_nowait({"type": "websocket.disconnect", "code": event.code, "reason": event.reason}) |
| 249 | self.transport.close() |
| 250 | |
| 251 | def handle_ping(self, event: events.Ping) -> None: |
| 252 | self.transport.write(self.conn.send(event.response())) |
no test coverage detected