MCPcopy
hub / github.com/encode/uvicorn / handle_connect

Method handle_connect

uvicorn/protocols/websockets/wsproto_impl.py:199–225  ·  view source on GitHub ↗
(self, event: events.Request)

Source from the content-addressed store, hash-verified

197 # Event handlers
198
199 def handle_connect(self, event: events.Request) -> None:
200 headers = [(b"host", event.host.encode())]
201 headers += [(key.lower(), value) for key, value in event.extra_headers]
202 raw_path, _, query_string = event.target.partition("?")
203 path = unquote(raw_path)
204 full_path = self.root_path + path
205 full_raw_path = self.root_path.encode("ascii") + raw_path.encode("ascii")
206 self.scope: WebSocketScope = {
207 "type": "websocket",
208 "asgi": {"version": self.config.asgi_version, "spec_version": "2.4"},
209 "http_version": "1.1",
210 "scheme": self.scheme,
211 "server": self.server,
212 "client": self.client,
213 "root_path": self.root_path,
214 "path": full_path,
215 "raw_path": full_raw_path,
216 "query_string": query_string.encode("ascii"),
217 "headers": headers,
218 "subprotocols": event.subprotocols,
219 "state": self.app_state.copy(),
220 "extensions": {"websocket.http.response": {}},
221 }
222 self.queue.put_nowait({"type": "websocket.connect"})
223 task = self.loop.create_task(self.run_asgi())
224 task.add_done_callback(self.on_task_complete)
225 self.tasks.add(task)
226
227 def handle_message(self, event: events.TextMessage | events.BytesMessage) -> None:
228 try:

Callers 1

handle_eventsMethod · 0.95

Calls 3

run_asgiMethod · 0.95
create_taskMethod · 0.45
add_done_callbackMethod · 0.45

Tested by

no test coverage detected