Process the headers sent by the server to this client connection. 'key' is the websocket handshake challenge/response key.
(
self, key: Union[str, bytes], headers: httputil.HTTPHeaders
)
| 981 | return [] |
| 982 | |
| 983 | def _process_server_headers( |
| 984 | self, key: Union[str, bytes], headers: httputil.HTTPHeaders |
| 985 | ) -> None: |
| 986 | """Process the headers sent by the server to this client connection. |
| 987 | |
| 988 | 'key' is the websocket handshake challenge/response key. |
| 989 | """ |
| 990 | assert headers["Upgrade"].lower() == "websocket" |
| 991 | assert headers["Connection"].lower() == "upgrade" |
| 992 | accept = self.compute_accept_value(key) |
| 993 | assert headers["Sec-Websocket-Accept"] == accept |
| 994 | |
| 995 | extensions = self._parse_extensions_header(headers) |
| 996 | for ext in extensions: |
| 997 | if ext[0] == "permessage-deflate" and self._compression_options is not None: |
| 998 | self._create_compressors("client", ext[1]) |
| 999 | else: |
| 1000 | raise ValueError("unsupported extension %r", ext) |
| 1001 | |
| 1002 | self.selected_subprotocol = headers.get("Sec-WebSocket-Protocol", None) |
| 1003 | |
| 1004 | def _get_compressor_options( |
| 1005 | self, |
nothing calls this directly
no test coverage detected