| 730 | |
| 731 | |
| 732 | class _PerMessageDeflateCompressor: |
| 733 | def __init__( |
| 734 | self, |
| 735 | persistent: bool, |
| 736 | max_wbits: Optional[int], |
| 737 | compression_options: Optional[Dict[str, Any]] = None, |
| 738 | ) -> None: |
| 739 | if max_wbits is None: |
| 740 | max_wbits = zlib.MAX_WBITS |
| 741 | # There is no symbolic constant for the minimum wbits value. |
| 742 | if not (8 <= max_wbits <= zlib.MAX_WBITS): |
| 743 | raise ValueError( |
| 744 | "Invalid max_wbits value %r; allowed range 8-%d", |
| 745 | max_wbits, |
| 746 | zlib.MAX_WBITS, |
| 747 | ) |
| 748 | self._max_wbits = max_wbits |
| 749 | |
| 750 | if ( |
| 751 | compression_options is None |
| 752 | or "compression_level" not in compression_options |
| 753 | ): |
| 754 | self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL |
| 755 | else: |
| 756 | self._compression_level = compression_options["compression_level"] |
| 757 | |
| 758 | if compression_options is None or "mem_level" not in compression_options: |
| 759 | self._mem_level = 8 |
| 760 | else: |
| 761 | self._mem_level = compression_options["mem_level"] |
| 762 | |
| 763 | if persistent: |
| 764 | self._compressor = self._create_compressor() # type: Optional[_Compressor] |
| 765 | else: |
| 766 | self._compressor = None |
| 767 | |
| 768 | def _create_compressor(self) -> "_Compressor": |
| 769 | return zlib.compressobj( |
| 770 | self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level |
| 771 | ) |
| 772 | |
| 773 | def compress(self, data: bytes) -> bytes: |
| 774 | compressor = self._compressor or self._create_compressor() |
| 775 | data = compressor.compress(data) + compressor.flush(zlib.Z_SYNC_FLUSH) |
| 776 | assert data.endswith(b"\x00\x00\xff\xff") |
| 777 | return data[:-4] |
| 778 | |
| 779 | |
| 780 | class _PerMessageDeflateDecompressor: |