MCPcopy
hub / github.com/scrapy/scrapy / _inflate

Function _inflate

scrapy/utils/_compression.py:36–59  ·  view source on GitHub ↗
(data: bytes, *, max_size: int = 0)

Source from the content-addressed store, hash-verified

34
35
36def _inflate(data: bytes, *, max_size: int = 0) -> bytes:
37 decompressor = zlib.decompressobj()
38 try:
39 first_chunk = decompressor.decompress(data, max_length=_CHUNK_SIZE)
40 except zlib.error:
41 # to work with raw deflate content that may be sent by microsoft servers.
42 decompressor = zlib.decompressobj(wbits=-15)
43 first_chunk = decompressor.decompress(data, max_length=_CHUNK_SIZE)
44 decompressed_size = len(first_chunk)
45 _check_max_size(decompressed_size, max_size)
46 output_stream = BytesIO()
47 output_stream.write(first_chunk)
48 while decompressor.unconsumed_tail:
49 output_chunk = decompressor.decompress(
50 decompressor.unconsumed_tail, max_length=_CHUNK_SIZE
51 )
52 decompressed_size += len(output_chunk)
53 _check_max_size(decompressed_size, max_size)
54 output_stream.write(output_chunk)
55 if tail := decompressor.flush():
56 decompressed_size += len(tail)
57 _check_max_size(decompressed_size, max_size)
58 output_stream.write(tail)
59 return output_stream.getvalue()
60
61
62def _unbrotli(data: bytes, *, max_size: int = 0) -> bytes:

Callers 1

_decodeMethod · 0.90

Calls 3

_check_max_sizeFunction · 0.85
flushMethod · 0.80
writeMethod · 0.45

Tested by

no test coverage detected