MCPcopy
hub / github.com/encode/httpx / echo_body_with_response_stream

Function echo_body_with_response_stream

tests/test_wsgi.py:48–64  ·  view source on GitHub ↗
(
    environ: WSGIEnvironment, start_response: StartResponse
)

Source from the content-addressed store, hash-verified

46
47
48def echo_body_with_response_stream(
49 environ: WSGIEnvironment, start_response: StartResponse
50) -> typing.Iterable[bytes]:
51 status = "200 OK"
52
53 response_headers = [("Content-Type", "text/plain")]
54
55 start_response(status, response_headers)
56
57 def output_generator(f: typing.IO[bytes]) -> typing.Iterator[bytes]:
58 while True:
59 output = f.read(2)
60 if not output:
61 break
62 yield output
63
64 return output_generator(f=environ["wsgi.input"])
65
66
67def raise_exc(

Callers

nothing calls this directly

Calls 1

output_generatorFunction · 0.85

Tested by

no test coverage detected