MCPcopy Index your code
hub / github.com/python/cpython / test_zstd_multithread_compress

Method test_zstd_multithread_compress

Lib/test/test_zstd.py:325–347  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

323 @unittest.skipIf(not SUPPORT_MULTITHREADING,
324 "zstd build doesn't support multi-threaded compression")
325 def test_zstd_multithread_compress(self):
326 size = 40*_1M
327 b = THIS_FILE_BYTES * (size // len(THIS_FILE_BYTES))
328
329 options = {CompressionParameter.compression_level : 4,
330 CompressionParameter.nb_workers : 2}
331
332 # compress()
333 dat1 = compress(b, options=options)
334 dat2 = decompress(dat1)
335 self.assertEqual(dat2, b)
336
337 # ZstdCompressor
338 c = ZstdCompressor(options=options)
339 dat1 = c.compress(b, c.CONTINUE)
340 dat2 = c.compress(b, c.FLUSH_BLOCK)
341 dat3 = c.compress(b, c.FLUSH_FRAME)
342 dat4 = decompress(dat1+dat2+dat3)
343 self.assertEqual(dat4, b * 3)
344
345 # ZstdFile
346 with ZstdFile(io.BytesIO(), 'w', options=options) as f:
347 f.write(b)
348
349 def test_compress_flushblock(self):
350 point = len(THIS_FILE_BYTES) // 2

Callers

nothing calls this directly

Calls 6

compressFunction · 0.90
decompressFunction · 0.90
ZstdFileClass · 0.90
assertEqualMethod · 0.45
compressMethod · 0.45
writeMethod · 0.45

Tested by

no test coverage detected