MCPcopy
hub / github.com/encode/starlette / receive_json

Method receive_json

starlette/websockets.py:130–142  ·  view source on GitHub ↗
(self, mode: str = "text")

Source from the content-addressed store, hash-verified

128 return cast(bytes, message["bytes"])
129
130 async def receive_json(self, mode: str = "text") -> Any:
131 if mode not in {"text", "binary"}:
132 raise RuntimeError('The "mode" argument should be "text" or "binary".')
133 if self.application_state != WebSocketState.CONNECTED:
134 raise RuntimeError('WebSocket is not connected. Need to call "accept" first.')
135 message = await self.receive()
136 self._raise_on_disconnect(message)
137
138 if mode == "text":
139 text = message["text"]
140 else:
141 text = message["bytes"].decode("utf-8")
142 return json.loads(text)
143
144 async def iter_text(self) -> AsyncIterator[str]:
145 try:

Callers 15

iter_jsonMethod · 0.95
test_websocket_urlFunction · 0.95
appFunction · 0.95
test_websocket_headersFunction · 0.95
test_websocket_portFunction · 0.95
test_websocket_iter_jsonFunction · 0.95
asgiFunction · 0.95

Calls 3

receiveMethod · 0.95
_raise_on_disconnectMethod · 0.95
decodeMethod · 0.80

Tested by 15

test_websocket_urlFunction · 0.76
appFunction · 0.76
test_websocket_headersFunction · 0.76
test_websocket_portFunction · 0.76
test_websocket_iter_jsonFunction · 0.76
asgiFunction · 0.76
test_websocket_stateFunction · 0.36