MCPcopy Index your code
hub / github.com/python/cpython / test_compressor

Method test_compressor

Lib/test/test_free_threading/test_bz2.py:18–30  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

16@threading_helper.requires_working_threading()
17class TestBZ2(unittest.TestCase):
18 def test_compressor(self):
19 bz2c = BZ2Compressor()
20
21 def worker():
22 # it should return empty bytes as it buffers data internally
23 data = bz2c.compress(TEXT)
24 self.assertEqual(data, b"")
25
26 run_concurrently(worker_func=worker, nthreads=NTHREADS)
27 data = bz2c.flush()
28 # The decompressed data should be TEXT repeated NTHREADS times
29 decompressed = ext_decompress(data)
30 self.assertEqual(decompressed, TEXT * NTHREADS)
31
32 def test_decompressor(self):
33 chunk_size = 128

Callers

nothing calls this directly

Calls 4

run_concurrentlyFunction · 0.90
ext_decompressFunction · 0.90
flushMethod · 0.45
assertEqualMethod · 0.45

Tested by

no test coverage detected