Send a close frame.
(self, code, reason="")
| 421 | self.transport.write(bytes(frame)) |
| 422 | |
| 423 | async def _send_close(self, code, reason=""): |
| 424 | """Send a close frame.""" |
| 425 | if self._close_sent: |
| 426 | return # Already sent |
| 427 | |
| 428 | payload = struct.pack("!H", code) |
| 429 | if reason: |
| 430 | payload += reason.encode("utf-8")[:123] # Max 125 bytes total |
| 431 | await self._send_frame(OPCODE_CLOSE, payload) |
| 432 | self._close_sent = True |
| 433 | |
| 434 | # If we already received a close, handshake is complete |
| 435 | if self._close_received: |
| 436 | self.closed = True |
| 437 | self._close_event.set() |
no test coverage detected