MCPcopy
hub / github.com/urllib3/urllib3 / absolute_uri

Method absolute_uri

dummyserver/asgi_proxy.py:49–90  ·  view source on GitHub ↗
(
        self,
        scope: HTTPScope,
        receive: ASGIReceiveCallable,
        send: ASGISendCallable,
    )

Source from the content-addressed store, hash-verified

47 raise ValueError(scope["method"])
48
49 async def absolute_uri(
50 self,
51 scope: HTTPScope,
52 receive: ASGIReceiveCallable,
53 send: ASGISendCallable,
54 ) -> None:
55 async with httpx.AsyncClient(verify=self.ssl_context or True) as client:
56 client_response = await client.request(
57 method=scope["method"],
58 url=scope["path"],
59 params=scope["query_string"].decode(),
60 headers=list(scope["headers"]),
61 content=await _read_body(receive),
62 )
63
64 headers = []
65 for header in (
66 "Date",
67 "Cache-Control",
68 "Server",
69 "Content-Type",
70 "Location",
71 ):
72 v = client_response.headers.get(header)
73 if v:
74 headers.append((header.encode(), v.encode()))
75 headers.append((b"Content-Length", str(len(client_response.content)).encode()))
76
77 await send(
78 HTTPResponseStartEvent(
79 type="http.response.start",
80 status=client_response.status_code,
81 headers=headers,
82 )
83 )
84 await send(
85 HTTPResponseBodyEvent(
86 type="http.response.body",
87 body=client_response.content,
88 more_body=False,
89 )
90 )
91
92 async def connect(self, scope: HTTPScope, send: ASGISendCallable) -> None:
93 async def start_forward(

Callers 1

__call__Method · 0.95

Calls 3

_read_bodyFunction · 0.85
getMethod · 0.80
requestMethod · 0.45

Tested by

no test coverage detected