(self)
| 182 | self.writable.set() # pragma: full coverage |
| 183 | |
| 184 | def shutdown(self) -> None: |
| 185 | self.stop_keepalive() |
| 186 | if self.handshake_complete: |
| 187 | self.queue.put_nowait({"type": "websocket.disconnect", "code": 1012}) |
| 188 | output = self.conn.send(wsproto.events.CloseConnection(code=1012)) |
| 189 | self.transport.write(output) |
| 190 | else: |
| 191 | self.send_500_response() |
| 192 | self.transport.close() |
| 193 | |
| 194 | def on_task_complete(self, task: asyncio.Task[None]) -> None: |
| 195 | self.tasks.discard(task) |
nothing calls this directly
no test coverage detected