Read and process incoming WebSocket frames.
(self)
| 212 | self.transport.write("".join(headers).encode("latin-1")) |
| 213 | |
| 214 | async def _read_frames(self): |
| 215 | """Read and process incoming WebSocket frames.""" |
| 216 | try: |
| 217 | # Continue reading while not closed, or if we sent close but haven't |
| 218 | # received client's close response yet (RFC 6455 close handshake) |
| 219 | while not self.closed or (self._close_sent and not self._close_received): |
| 220 | frame = await self._read_frame() |
| 221 | if frame is None: |
| 222 | break |
| 223 | |
| 224 | opcode, payload = frame |
| 225 | |
| 226 | if opcode == OPCODE_CLOSE: |
| 227 | await self._handle_close(payload) |
| 228 | break |
| 229 | |
| 230 | if opcode == OPCODE_PING: |
| 231 | await self._send_frame(OPCODE_PONG, payload) |
| 232 | elif opcode == OPCODE_PONG: |
| 233 | # Ignore pongs |
| 234 | pass |
| 235 | elif opcode == OPCODE_TEXT: |
| 236 | await self._receive_queue.put({ |
| 237 | "type": "websocket.receive", |
| 238 | "text": payload.decode("utf-8"), |
| 239 | }) |
| 240 | elif opcode == OPCODE_BINARY: |
| 241 | await self._receive_queue.put({ |
| 242 | "type": "websocket.receive", |
| 243 | "bytes": payload, |
| 244 | }) |
| 245 | elif opcode == OPCODE_CONTINUATION: |
| 246 | # Handle fragmented messages |
| 247 | await self._handle_continuation(payload) |
| 248 | |
| 249 | except asyncio.CancelledError: |
| 250 | raise |
| 251 | except Exception as e: |
| 252 | self.log.debug("WebSocket read error: %s", e) |
| 253 | finally: |
| 254 | # Signal disconnect |
| 255 | if not self.closed: |
| 256 | self.closed = True |
| 257 | await self._receive_queue.put({ |
| 258 | "type": "websocket.disconnect", |
| 259 | "code": self.close_code or CLOSE_ABNORMAL, |
| 260 | }) |
| 261 | |
| 262 | async def _read_frame(self): # pylint: disable=too-many-return-statements |
| 263 | """Read a single WebSocket frame. |
no test coverage detected