(self)
| 323 | @unittest.skipIf(not SUPPORT_MULTITHREADING, |
| 324 | "zstd build doesn't support multi-threaded compression") |
| 325 | def test_zstd_multithread_compress(self): |
| 326 | size = 40*_1M |
| 327 | b = THIS_FILE_BYTES * (size // len(THIS_FILE_BYTES)) |
| 328 | |
| 329 | options = {CompressionParameter.compression_level : 4, |
| 330 | CompressionParameter.nb_workers : 2} |
| 331 | |
| 332 | # compress() |
| 333 | dat1 = compress(b, options=options) |
| 334 | dat2 = decompress(dat1) |
| 335 | self.assertEqual(dat2, b) |
| 336 | |
| 337 | # ZstdCompressor |
| 338 | c = ZstdCompressor(options=options) |
| 339 | dat1 = c.compress(b, c.CONTINUE) |
| 340 | dat2 = c.compress(b, c.FLUSH_BLOCK) |
| 341 | dat3 = c.compress(b, c.FLUSH_FRAME) |
| 342 | dat4 = decompress(dat1+dat2+dat3) |
| 343 | self.assertEqual(dat4, b * 3) |
| 344 | |
| 345 | # ZstdFile |
| 346 | with ZstdFile(io.BytesIO(), 'w', options=options) as f: |
| 347 | f.write(b) |
| 348 | |
| 349 | def test_compress_flushblock(self): |
| 350 | point = len(THIS_FILE_BYTES) // 2 |
nothing calls this directly
no test coverage detected