(self, read_amt: int | None)
| 522 | ), |
| 523 | ) |
| 524 | def test_decode_multiframe_zstd(self, read_amt: int | None) -> None: |
| 525 | data = ( |
| 526 | # Zstandard frame |
| 527 | zstd_compress(b"foo") |
| 528 | # skippable frame (must be ignored) |
| 529 | + bytes.fromhex( |
| 530 | "50 2A 4D 18" # Magic_Number (little-endian) |
| 531 | "07 00 00 00" # Frame_Size (little-endian) |
| 532 | "00 00 00 00 00 00 00" # User_Data |
| 533 | ) |
| 534 | # Zstandard frame |
| 535 | + zstd_compress(b"bar") |
| 536 | ) |
| 537 | |
| 538 | fp = BytesIO(data) |
| 539 | result = bytearray() |
| 540 | r = HTTPResponse( |
| 541 | fp, headers={"content-encoding": "zstd"}, preload_content=False |
| 542 | ) |
| 543 | total_length = 6 |
| 544 | while len(result) < total_length: |
| 545 | chunk = r.read(read_amt, decode_content=True) |
| 546 | if read_amt is None: |
| 547 | assert len(chunk) == total_length |
| 548 | else: |
| 549 | assert len(chunk) == min(read_amt, total_length - len(result)) |
| 550 | result += chunk |
| 551 | assert bytes(result) == b"foobar" |
| 552 | |
| 553 | @onlyZstd() |
| 554 | def test_decode_multiframe_zstd_with_max_length_close_to_compressed_data_size( |
nothing calls this directly
no test coverage detected