MCPcopy
hub / github.com/encode/uvicorn / __call__

Method __call__

uvicorn/middleware/proxy_headers.py:27–62  ·  view source on GitHub ↗
(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable)

Source from the content-addressed store, hash-verified

25 self.trusted_hosts = _TrustedHosts(trusted_hosts)
26
27 async def __call__(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None:
28 if scope["type"] == "lifespan":
29 return await self.app(scope, receive, send)
30
31 client_addr = scope.get("client")
32 client_host = client_addr[0] if client_addr else None
33
34 if client_host in self.trusted_hosts:
35 x_forwarded_proto_value: bytes | None = None
36 x_forwarded_for_values: list[bytes] = []
37 for name, value in scope["headers"]:
38 if name == b"x-forwarded-proto":
39 x_forwarded_proto_value = value
40 elif name == b"x-forwarded-for":
41 x_forwarded_for_values.append(value)
42
43 if x_forwarded_proto_value is not None:
44 x_forwarded_proto = x_forwarded_proto_value.decode("latin1").strip()
45
46 if x_forwarded_proto in {"http", "https", "ws", "wss"}:
47 if scope["type"] == "websocket":
48 scope["scheme"] = x_forwarded_proto.replace("http", "ws")
49 else:
50 scope["scheme"] = x_forwarded_proto
51
52 if x_forwarded_for_values:
53 x_forwarded_for = b", ".join(x_forwarded_for_values).decode("latin1")
54 host, port = self.trusted_hosts.get_trusted_client_address(x_forwarded_for)
55
56 if host:
57 # If the x-forwarded-for header is empty then host is an empty string.
58 # Only set the client if we actually got something usable.
59 # See: https://github.com/Kludex/uvicorn/issues/1068
60 scope["client"] = (host, port)
61
62 return await self.app(scope, receive, send)
63
64
65def _parse_raw_hosts(value: str) -> list[str]:

Callers

nothing calls this directly

Calls 2

joinMethod · 0.80

Tested by

no test coverage detected