(
self, fin: bool, opcode: int, data: bytes, flags: int = 0
)
| 1049 | ) |
| 1050 | |
| 1051 | def _write_frame( |
| 1052 | self, fin: bool, opcode: int, data: bytes, flags: int = 0 |
| 1053 | ) -> "Future[None]": |
| 1054 | data_len = len(data) |
| 1055 | if opcode & 0x8: |
| 1056 | # All control frames MUST have a payload length of 125 |
| 1057 | # bytes or less and MUST NOT be fragmented. |
| 1058 | if not fin: |
| 1059 | raise ValueError("control frames may not be fragmented") |
| 1060 | if data_len > 125: |
| 1061 | raise ValueError("control frame payloads may not exceed 125 bytes") |
| 1062 | if fin: |
| 1063 | finbit = self.FIN |
| 1064 | else: |
| 1065 | finbit = 0 |
| 1066 | frame = struct.pack("B", finbit | opcode | flags) |
| 1067 | if self.mask_outgoing: |
| 1068 | mask_bit = 0x80 |
| 1069 | else: |
| 1070 | mask_bit = 0 |
| 1071 | if data_len < 126: |
| 1072 | frame += struct.pack("B", data_len | mask_bit) |
| 1073 | elif data_len <= 0xFFFF: |
| 1074 | frame += struct.pack("!BH", 126 | mask_bit, data_len) |
| 1075 | else: |
| 1076 | frame += struct.pack("!BQ", 127 | mask_bit, data_len) |
| 1077 | if self.mask_outgoing: |
| 1078 | mask = os.urandom(4) |
| 1079 | data = mask + _websocket_mask(mask, data) |
| 1080 | frame += data |
| 1081 | self._wire_bytes_out += len(frame) |
| 1082 | return self.stream.write(frame) |
| 1083 | |
| 1084 | def write_message( |
| 1085 | self, message: Union[str, bytes, Dict[str, Any]], binary: bool = False |
no test coverage detected