MCPcopy
hub / github.com/benoitc/gunicorn / _build_websocket_scope

Method _build_websocket_scope

gunicorn/asgi/protocol.py:1188–1224  ·  view source on GitHub ↗

Build ASGI WebSocket scope from parsed request.

(self, request, sockname, peername)

Source from the content-addressed store, hash-verified

1186 return environ
1187
1188 def _build_websocket_scope(self, request, sockname, peername):
1189 """Build ASGI WebSocket scope from parsed request."""
1190 # Build headers list as bytes tuples
1191 headers = []
1192 for name, value in request.headers:
1193 headers.append((name.lower().encode("latin-1"), value.encode("latin-1")))
1194
1195 # Extract subprotocols from Sec-WebSocket-Protocol header
1196 subprotocols = []
1197 for name, value in request.headers:
1198 if name == "SEC-WEBSOCKET-PROTOCOL":
1199 subprotocols = [s.strip() for s in value.split(",")]
1200 break
1201
1202 server = _normalize_sockaddr(sockname)
1203 client = _normalize_sockaddr(peername)
1204
1205 scope = {
1206 "type": "websocket",
1207 "asgi": {"version": "3.0", "spec_version": "2.4"},
1208 "http_version": f"{request.version[0]}.{request.version[1]}",
1209 "scheme": "wss" if request.scheme == "https" else "ws",
1210 "path": request.path,
1211 "raw_path": request.raw_path if request.raw_path else b"",
1212 "query_string": request.query.encode("latin-1") if request.query else b"",
1213 "root_path": self.cfg.root_path or "",
1214 "headers": headers,
1215 "server": server,
1216 "client": client,
1217 "subprotocols": subprotocols,
1218 }
1219
1220 # Add state dict for lifespan sharing
1221 if hasattr(self.worker, 'state'):
1222 scope["state"] = self.worker.state
1223
1224 return scope
1225
1226 def _send_informational(self, status, headers, request):
1227 """Send an informational response (1xx) such as 103 Early Hints.

Calls 2

_normalize_sockaddrFunction · 0.85
encodeMethod · 0.80