(min int64, key readBytesFunc, val readBytesFunc)
| 143 | } |
| 144 | |
| 145 | func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readBytesFunc) ( |
| 146 | offset int64, timestamp int64, headers []Header, err error) { |
| 147 | |
| 148 | for r.readerStack != nil { |
| 149 | if r.remain == 0 { |
| 150 | r.readerStack = r.parent |
| 151 | continue |
| 152 | } |
| 153 | if err = r.readHeader(); err != nil { |
| 154 | return |
| 155 | } |
| 156 | offset = r.header.firstOffset |
| 157 | timestamp = r.header.v1.timestamp |
| 158 | var codec CompressionCodec |
| 159 | if codec, err = r.header.compression(); err != nil { |
| 160 | return |
| 161 | } |
| 162 | if r.debug { |
| 163 | r.log("Reading with codec=%T", codec) |
| 164 | } |
| 165 | if codec != nil { |
| 166 | // discard next four bytes...will be -1 to indicate null key |
| 167 | if err = r.discardN(4); err != nil { |
| 168 | return |
| 169 | } |
| 170 | |
| 171 | // read and decompress the contained message set. |
| 172 | r.decompressed.Reset() |
| 173 | if err = r.readBytesWith(func(br *bufio.Reader, sz int, n int) (remain int, err error) { |
| 174 | // x4 as a guess that the average compression ratio is near 75% |
| 175 | r.decompressed.Grow(4 * n) |
| 176 | limitReader := io.LimitedReader{R: br, N: int64(n)} |
| 177 | codecReader := codec.NewReader(&limitReader) |
| 178 | _, err = r.decompressed.ReadFrom(codecReader) |
| 179 | remain = sz - (n - int(limitReader.N)) |
| 180 | codecReader.Close() |
| 181 | return |
| 182 | }); err != nil { |
| 183 | return |
| 184 | } |
| 185 | |
| 186 | // the compressed message's offset will be equal to the offset of |
| 187 | // the last message in the set. within the compressed set, the |
| 188 | // offsets will be relative, so we have to scan through them to |
| 189 | // get the base offset. for example, if there are four compressed |
| 190 | // messages at offsets 10-13, then the container message will have |
| 191 | // offset 13 and the contained messages will be 0,1,2,3. the base |
| 192 | // offset for the container, then is 13-3=10. |
| 193 | if offset, err = extractOffset(offset, r.decompressed.Bytes()); err != nil { |
| 194 | return |
| 195 | } |
| 196 | |
| 197 | // mark the outer message as being read |
| 198 | r.markRead() |
| 199 | |
| 200 | // then push the decompressed bytes onto the stack. |
| 201 | r.readerStack = &readerStack{ |
| 202 | // Allocate a buffer of size 0, which gets capped at 16 bytes |
no test coverage detected