Execute on_message, returning its Future if it is a coroutine.
(self, opcode: int, data: bytes)
| 1218 | await handled_future |
| 1219 | |
| 1220 | def _handle_message(self, opcode: int, data: bytes) -> "Optional[Future[None]]": |
| 1221 | """Execute on_message, returning its Future if it is a coroutine.""" |
| 1222 | if self.client_terminated: |
| 1223 | return None |
| 1224 | |
| 1225 | if self._frame_compressed: |
| 1226 | assert self._decompressor is not None |
| 1227 | try: |
| 1228 | data = self._decompressor.decompress(data) |
| 1229 | except _DecompressTooLargeError: |
| 1230 | self.close(1009, "message too big after decompression") |
| 1231 | self._abort() |
| 1232 | return None |
| 1233 | |
| 1234 | if opcode == 0x1: |
| 1235 | # UTF-8 data |
| 1236 | self._message_bytes_in += len(data) |
| 1237 | try: |
| 1238 | decoded = data.decode("utf-8") |
| 1239 | except UnicodeDecodeError: |
| 1240 | self._abort() |
| 1241 | return None |
| 1242 | return self._run_callback(self.handler.on_message, decoded) |
| 1243 | elif opcode == 0x2: |
| 1244 | # Binary data |
| 1245 | self._message_bytes_in += len(data) |
| 1246 | return self._run_callback(self.handler.on_message, data) |
| 1247 | elif opcode == 0x8: |
| 1248 | # Close |
| 1249 | self.client_terminated = True |
| 1250 | if len(data) >= 2: |
| 1251 | self.close_code = struct.unpack(">H", data[:2])[0] |
| 1252 | if len(data) > 2: |
| 1253 | self.close_reason = to_unicode(data[2:]) |
| 1254 | # Echo the received close code, if any (RFC 6455 section 5.5.1). |
| 1255 | self.close(self.close_code) |
| 1256 | elif opcode == 0x9: |
| 1257 | # Ping |
| 1258 | try: |
| 1259 | self._write_frame(True, 0xA, data) |
| 1260 | except StreamClosedError: |
| 1261 | self._abort() |
| 1262 | self._run_callback(self.handler.on_ping, data) |
| 1263 | elif opcode == 0xA: |
| 1264 | # Pong |
| 1265 | self._received_pong = True |
| 1266 | return self._run_callback(self.handler.on_pong, data) |
| 1267 | else: |
| 1268 | self._abort() |
| 1269 | return None |
| 1270 | |
| 1271 | def close(self, code: Optional[int] = None, reason: Optional[str] = None) -> None: |
| 1272 | """Closes the WebSocket connection.""" |
no test coverage detected