(self, message: ASGISendEvent)
| 257 | self.transport.close() |
| 258 | |
| 259 | async def asgi_send(self, message: ASGISendEvent) -> None: |
| 260 | if not self.handshake_started_event.is_set(): |
| 261 | if message["type"] == "websocket.accept": |
| 262 | self.logger.info( |
| 263 | '%s - "WebSocket %s" [accepted]', |
| 264 | get_client_addr(self.scope), |
| 265 | get_path_with_query_string(self.scope), |
| 266 | ) |
| 267 | self.initial_response = None |
| 268 | self.accepted_subprotocol = cast(Subprotocol | None, message.get("subprotocol")) |
| 269 | if "headers" in message: |
| 270 | self.extra_headers.extend( |
| 271 | # ASGI spec requires bytes |
| 272 | # But for compatibility we need to convert it to strings |
| 273 | (name.decode("latin-1"), value.decode("latin-1")) |
| 274 | for name, value in message["headers"] |
| 275 | ) |
| 276 | self.handshake_started_event.set() |
| 277 | |
| 278 | elif message["type"] == "websocket.close": |
| 279 | self.logger.info( |
| 280 | '%s - "WebSocket %s" 403', |
| 281 | get_client_addr(self.scope), |
| 282 | get_path_with_query_string(self.scope), |
| 283 | ) |
| 284 | self.initial_response = (http.HTTPStatus.FORBIDDEN, [], b"") |
| 285 | self.handshake_started_event.set() |
| 286 | self.closed_event.set() |
| 287 | |
| 288 | elif message["type"] == "websocket.http.response.start": |
| 289 | self.logger.info( |
| 290 | '%s - "WebSocket %s" %d', |
| 291 | get_client_addr(self.scope), |
| 292 | get_path_with_query_string(self.scope), |
| 293 | message["status"], |
| 294 | ) |
| 295 | # websockets requires the status to be an enum. look it up. |
| 296 | status = http.HTTPStatus(message["status"]) |
| 297 | headers = [ |
| 298 | (name.decode("latin-1"), value.decode("latin-1")) for name, value in message.get("headers", []) |
| 299 | ] |
| 300 | self.initial_response = (status, headers, b"") |
| 301 | self.handshake_started_event.set() |
| 302 | |
| 303 | else: |
| 304 | raise RuntimeError( |
| 305 | "Expected ASGI message 'websocket.accept', 'websocket.close', " |
| 306 | f"or 'websocket.http.response.start' but got '{message['type']}'." |
| 307 | ) |
| 308 | |
| 309 | elif not self.closed_event.is_set() and self.initial_response is None: |
| 310 | await self.handshake_completed_event.wait() |
| 311 | |
| 312 | try: |
| 313 | if message["type"] == "websocket.send": |
| 314 | bytes_data = message.get("bytes") |
| 315 | text_data = message.get("text") |
| 316 | data = text_data if bytes_data is None else bytes_data |
nothing calls this directly
no test coverage detected