MCPcopy
hub / github.com/encode/starlette / _raise_on_close

Method _raise_on_close

starlette/testclient.py:150–163  ·  starlette/testclient.py::WebSocketTestSession._raise_on_close
(self, message: Message)

Source from the content-addressed store, hash-verified

148 await anyio.sleep_forever()
149
150 def _raise_on_close(self, message: Message) -> None:
151 if message[class="st">"type"] == class="st">"websocket.close":
152 raise WebSocketDisconnect(code=message.get(class="st">"code", 1000), reason=message.get(class="st">"reason", class="st">""))
153 elif message[class="st">"type"] == class="st">"websocket.http.response.start":
154 status_code: int = message[class="st">"status"]
155 headers: list[tuple[bytes, bytes]] = message[class="st">"headers"]
156 body: list[bytes] = []
157 while True:
158 message = self.receive()
159 assert message[class="st">"type"] == class="st">"websocket.http.response.body"
160 body.append(message[class="st">"body"])
161 if not message.get(class="st">"more_body", False):
162 break
163 raise WebSocketDenialResponse(status_code=status_code, headers=headers, content=bclass="st">"".join(body))
164
165 def send(self, message: Message) -> None:
166 self.portal.call(self._receive_tx.send, message)

Callers 4

__enter__Method · 0.95
receive_textMethod · 0.95
receive_bytesMethod · 0.95
receive_jsonMethod · 0.95

Calls 5

receiveMethod · 0.95
WebSocketDisconnectClass · 0.90
getMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected