MCPcopy
hub / github.com/urllib3/urllib3 / test_decode_multiframe_zstd

Method test_decode_multiframe_zstd

test/test_response.py:524–551  ·  view source on GitHub ↗
(self, read_amt: int | None)

Source from the content-addressed store, hash-verified

522 ),
523 )
524 def test_decode_multiframe_zstd(self, read_amt: int | None) -> None:
525 data = (
526 # Zstandard frame
527 zstd_compress(b"foo")
528 # skippable frame (must be ignored)
529 + bytes.fromhex(
530 "50 2A 4D 18" # Magic_Number (little-endian)
531 "07 00 00 00" # Frame_Size (little-endian)
532 "00 00 00 00 00 00 00" # User_Data
533 )
534 # Zstandard frame
535 + zstd_compress(b"bar")
536 )
537
538 fp = BytesIO(data)
539 result = bytearray()
540 r = HTTPResponse(
541 fp, headers={"content-encoding": "zstd"}, preload_content=False
542 )
543 total_length = 6
544 while len(result) < total_length:
545 chunk = r.read(read_amt, decode_content=True)
546 if read_amt is None:
547 assert len(chunk) == total_length
548 else:
549 assert len(chunk) == min(read_amt, total_length - len(result))
550 result += chunk
551 assert bytes(result) == b"foobar"
552
553 @onlyZstd()
554 def test_decode_multiframe_zstd_with_max_length_close_to_compressed_data_size(

Callers

nothing calls this directly

Calls 3

readMethod · 0.95
HTTPResponseClass · 0.90
zstd_compressFunction · 0.85

Tested by

no test coverage detected