| 130 | } |
| 131 | |
| 132 | func decompress(cc CompressionCodec, data []byte) ([]byte, error) { |
| 133 | // 0 means unbounded |
| 134 | limit := int(MaxDecompressedBatchSize) |
| 135 | switch cc { |
| 136 | case CompressionNone: |
| 137 | return data, nil |
| 138 | case CompressionGZIP: |
| 139 | var err error |
| 140 | reader, ok := gzipReaderPool.Get().(*gzip.Reader) |
| 141 | if !ok { |
| 142 | reader, err = gzip.NewReader(bytes.NewReader(data)) |
| 143 | } else { |
| 144 | err = reader.Reset(bytes.NewReader(data)) |
| 145 | } |
| 146 | |
| 147 | if err != nil { |
| 148 | return nil, err |
| 149 | } |
| 150 | |
| 151 | res, err := boundedDecompress(cc, reader, limit) |
| 152 | gzipReaderPool.Put(reader) |
| 153 | return res, err |
| 154 | case CompressionSnappy: |
| 155 | return boundedSnappyDecode(cc, data, limit) |
| 156 | case CompressionLZ4: |
| 157 | reader, ok := lz4ReaderPool.Get().(*lz4.Reader) |
| 158 | if !ok { |
| 159 | reader = lz4.NewReader(bytes.NewReader(data)) |
| 160 | } else { |
| 161 | reader.Reset(bytes.NewReader(data)) |
| 162 | } |
| 163 | |
| 164 | res, err := boundedDecompress(cc, reader, limit) |
| 165 | lz4ReaderPool.Put(reader) |
| 166 | return res, err |
| 167 | case CompressionZSTD: |
| 168 | return boundedZstdDecode(cc, data, limit) |
| 169 | default: |
| 170 | return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)} |
| 171 | } |
| 172 | } |