MCPcopy
hub / github.com/encode/uvicorn / asgi_send

Method asgi_send

uvicorn/protocols/websockets/websockets_impl.py:259–345  ·  view source on GitHub ↗
(self, message: ASGISendEvent)

Source from the content-addressed store, hash-verified

257 self.transport.close()
258
259 async def asgi_send(self, message: ASGISendEvent) -> None:
260 if not self.handshake_started_event.is_set():
261 if message["type"] == "websocket.accept":
262 self.logger.info(
263 '%s - "WebSocket %s" [accepted]',
264 get_client_addr(self.scope),
265 get_path_with_query_string(self.scope),
266 )
267 self.initial_response = None
268 self.accepted_subprotocol = cast(Subprotocol | None, message.get("subprotocol"))
269 if "headers" in message:
270 self.extra_headers.extend(
271 # ASGI spec requires bytes
272 # But for compatibility we need to convert it to strings
273 (name.decode("latin-1"), value.decode("latin-1"))
274 for name, value in message["headers"]
275 )
276 self.handshake_started_event.set()
277
278 elif message["type"] == "websocket.close":
279 self.logger.info(
280 '%s - "WebSocket %s" 403',
281 get_client_addr(self.scope),
282 get_path_with_query_string(self.scope),
283 )
284 self.initial_response = (http.HTTPStatus.FORBIDDEN, [], b"")
285 self.handshake_started_event.set()
286 self.closed_event.set()
287
288 elif message["type"] == "websocket.http.response.start":
289 self.logger.info(
290 '%s - "WebSocket %s" %d',
291 get_client_addr(self.scope),
292 get_path_with_query_string(self.scope),
293 message["status"],
294 )
295 # websockets requires the status to be an enum. look it up.
296 status = http.HTTPStatus(message["status"])
297 headers = [
298 (name.decode("latin-1"), value.decode("latin-1")) for name, value in message.get("headers", [])
299 ]
300 self.initial_response = (status, headers, b"")
301 self.handshake_started_event.set()
302
303 else:
304 raise RuntimeError(
305 "Expected ASGI message 'websocket.accept', 'websocket.close', "
306 f"or 'websocket.http.response.start' but got '{message['type']}'."
307 )
308
309 elif not self.closed_event.is_set() and self.initial_response is None:
310 await self.handshake_completed_event.wait()
311
312 try:
313 if message["type"] == "websocket.send":
314 bytes_data = message.get("bytes")
315 text_data = message.get("text")
316 data = text_data if bytes_data is None else bytes_data

Callers

nothing calls this directly

Calls 5

get_client_addrFunction · 0.90
extendMethod · 0.80
sendMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected