(self, data: bytes)
| 178 | self.seen_data = False |
| 179 | |
| 180 | def decode(self, data: bytes) -> bytes: |
| 181 | assert zstandard is not None |
| 182 | self.seen_data = True |
| 183 | output = io.BytesIO() |
| 184 | try: |
| 185 | output.write(self.decompressor.decompress(data)) |
| 186 | while self.decompressor.eof and self.decompressor.unused_data: |
| 187 | unused_data = self.decompressor.unused_data |
| 188 | self.decompressor = zstandard.ZstdDecompressor().decompressobj() |
| 189 | output.write(self.decompressor.decompress(unused_data)) |
| 190 | except zstandard.ZstdError as exc: |
| 191 | raise DecodingError(str(exc)) from exc |
| 192 | return output.getvalue() |
| 193 | |
| 194 | def flush(self) -> bytes: |
| 195 | if not self.seen_data: |
nothing calls this directly
no test coverage detected