MCPcopy
hub / github.com/encode/starlette / decode

Method decode

starlette/endpoints.py:91–117  ·  view source on GitHub ↗
(self, websocket: WebSocket, message: Message)

Source from the content-addressed store, hash-verified

89 await self.on_disconnect(websocket, close_code)
90
91 async def decode(self, websocket: WebSocket, message: Message) -> Any:
92 if self.encoding == "text":
93 if "text" not in message:
94 await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
95 raise RuntimeError("Expected text websocket messages, but got bytes")
96 return message["text"]
97
98 elif self.encoding == "bytes":
99 if "bytes" not in message:
100 await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
101 raise RuntimeError("Expected bytes websocket messages, but got text")
102 return message["bytes"]
103
104 elif self.encoding == "json":
105 if message.get("text") is not None:
106 text = message["text"]
107 else:
108 text = message["bytes"].decode("utf-8")
109
110 try:
111 return json.loads(text)
112 except json.decoder.JSONDecodeError:
113 await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
114 raise RuntimeError("Malformed JSON data received.")
115
116 assert self.encoding is None, f"Unsupported 'encoding' attribute {self.encoding}"
117 return message["text"] if message.get("text") else message["bytes"]
118
119 async def on_connect(self, websocket: WebSocket) -> None:
120 """Override to handle an incoming websocket connection"""

Callers 15

dispatchMethod · 0.95
receive_jsonMethod · 0.80
handle_requestMethod · 0.80
sendMethod · 0.80
_user_safe_decodeFunction · 0.80
parseMethod · 0.80
parseMethod · 0.80
receive_jsonMethod · 0.80
__init__Method · 0.80
__init__Method · 0.80
keysMethod · 0.80
valuesMethod · 0.80

Calls 2

closeMethod · 0.45
getMethod · 0.45

Tested by 10

receive_jsonMethod · 0.64
handle_requestMethod · 0.64
sendMethod · 0.64
appFunction · 0.64
authenticateMethod · 0.64
handler_that_reads_bodyFunction · 0.64
appFunction · 0.64
multi_items_appFunction · 0.64
app_with_headersFunction · 0.64