MCPcopy
hub / github.com/encode/httpx / test_streaming

Function test_streaming

tests/test_decoders.py:186–201  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

184
185@pytest.mark.anyio
186async def test_streaming():
187 body = b"test 123"
188 compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
189
190 async def compress(body: bytes) -> typing.AsyncIterator[bytes]:
191 yield compressor.compress(body)
192 yield compressor.flush()
193
194 headers = [(b"Content-Encoding", b"gzip")]
195 response = httpx.Response(
196 200,
197 headers=headers,
198 content=compress(body),
199 )
200 assert not hasattr(response, "body")
201 assert await response.aread() == body
202
203
204@pytest.mark.parametrize("header_value", (b"deflate", b"gzip", b"br", b"identity"))

Callers

nothing calls this directly

Calls 2

areadMethod · 0.95
compressFunction · 0.85

Tested by

no test coverage detected