(data: bytes, *, max_size: int = 0)
| 34 | |
| 35 | |
| 36 | def _inflate(data: bytes, *, max_size: int = 0) -> bytes: |
| 37 | decompressor = zlib.decompressobj() |
| 38 | try: |
| 39 | first_chunk = decompressor.decompress(data, max_length=_CHUNK_SIZE) |
| 40 | except zlib.error: |
| 41 | # to work with raw deflate content that may be sent by microsoft servers. |
| 42 | decompressor = zlib.decompressobj(wbits=-15) |
| 43 | first_chunk = decompressor.decompress(data, max_length=_CHUNK_SIZE) |
| 44 | decompressed_size = len(first_chunk) |
| 45 | _check_max_size(decompressed_size, max_size) |
| 46 | output_stream = BytesIO() |
| 47 | output_stream.write(first_chunk) |
| 48 | while decompressor.unconsumed_tail: |
| 49 | output_chunk = decompressor.decompress( |
| 50 | decompressor.unconsumed_tail, max_length=_CHUNK_SIZE |
| 51 | ) |
| 52 | decompressed_size += len(output_chunk) |
| 53 | _check_max_size(decompressed_size, max_size) |
| 54 | output_stream.write(output_chunk) |
| 55 | if tail := decompressor.flush(): |
| 56 | decompressed_size += len(tail) |
| 57 | _check_max_size(decompressed_size, max_size) |
| 58 | output_stream.write(tail) |
| 59 | return output_stream.getvalue() |
| 60 | |
| 61 | |
| 62 | def _unbrotli(data: bytes, *, max_size: int = 0) -> bytes: |
no test coverage detected