MCPcopy
hub / github.com/encode/httpx / test_response_async_streaming_picklable

Function test_response_async_streaming_picklable

tests/models/test_responses.py:977–993  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

975
976@pytest.mark.anyio
977async def test_response_async_streaming_picklable():
978 response = httpx.Response(200, content=async_streaming_body())
979 pickle_response = pickle.loads(pickle.dumps(response))
980 with pytest.raises(httpx.ResponseNotRead):
981 pickle_response.content # noqa: B018
982 with pytest.raises(httpx.StreamClosed):
983 await pickle_response.aread()
984 assert pickle_response.is_stream_consumed is False
985 assert pickle_response.num_bytes_downloaded == 0
986 assert pickle_response.headers == {"Transfer-Encoding": "chunked"}
987
988 response = httpx.Response(200, content=async_streaming_body())
989 await response.aread()
990 pickle_response = pickle.loads(pickle.dumps(response))
991 assert pickle_response.is_stream_consumed is True
992 assert pickle_response.content == b"Hello, world!"
993 assert pickle_response.num_bytes_downloaded == 13
994
995
996def test_response_decode_text_using_autodetect():

Callers

nothing calls this directly

Calls 3

areadMethod · 0.95
async_streaming_bodyFunction · 0.85
areadMethod · 0.45

Tested by

no test coverage detected