Decompress a gzip compressed string in one shot. Return the decompressed string.
(data)
| 637 | |
| 638 | |
| 639 | def decompress(data): |
| 640 | """Decompress a gzip compressed string in one shot. |
| 641 | Return the decompressed string. |
| 642 | """ |
| 643 | decompressed_members = [] |
| 644 | while True: |
| 645 | fp = io.BytesIO(data) |
| 646 | if _read_gzip_header(fp) is None: |
| 647 | return b"".join(decompressed_members) |
| 648 | # Use a zlib raw deflate compressor |
| 649 | do = zlib.decompressobj(wbits=-zlib.MAX_WBITS) |
| 650 | # Read all the data except the header |
| 651 | decompressed = do.decompress(data[fp.tell():]) |
| 652 | if not do.eof or len(do.unused_data) < 8: |
| 653 | raise EOFError("Compressed file ended before the end-of-stream " |
| 654 | "marker was reached") |
| 655 | crc, length = struct.unpack("<II", do.unused_data[:8]) |
| 656 | if crc != zlib.crc32(decompressed): |
| 657 | raise BadGzipFile("CRC check failed") |
| 658 | if length != (len(decompressed) & 0xffffffff): |
| 659 | raise BadGzipFile("Incorrect length of data produced") |
| 660 | decompressed_members.append(decompressed) |
| 661 | data = do.unused_data[8:].lstrip(b"\x00") |
| 662 | |
| 663 | |
| 664 | def main(): |
nothing calls this directly
no test coverage detected
searching dependent graphs…