(_ int64, key readBytesFunc, val readBytesFunc)
| 250 | } |
| 251 | |
| 252 | func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) ( |
| 253 | offset int64, lastOffset int64, timestamp int64, headers []Header, err error) { |
| 254 | if err = r.readHeader(); err != nil { |
| 255 | return |
| 256 | } |
| 257 | if r.count == int(r.header.v2.count) { // first time reading this set, so check for compression headers. |
| 258 | var codec CompressionCodec |
| 259 | if codec, err = r.header.compression(); err != nil { |
| 260 | return |
| 261 | } |
| 262 | if codec != nil { |
| 263 | batchRemain := int(r.header.length - 49) // TODO: document this magic number |
| 264 | if batchRemain > r.remain { |
| 265 | err = errShortRead |
| 266 | return |
| 267 | } |
| 268 | if batchRemain < 0 { |
| 269 | err = fmt.Errorf("batch remain < 0 (%d)", batchRemain) |
| 270 | return |
| 271 | } |
| 272 | r.decompressed.Reset() |
| 273 | // x4 as a guess that the average compression ratio is near 75% |
| 274 | r.decompressed.Grow(4 * batchRemain) |
| 275 | limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)} |
| 276 | codecReader := codec.NewReader(&limitReader) |
| 277 | _, err = r.decompressed.ReadFrom(codecReader) |
| 278 | codecReader.Close() |
| 279 | if err != nil { |
| 280 | return |
| 281 | } |
| 282 | r.remain -= batchRemain - int(limitReader.N) |
| 283 | r.readerStack = &readerStack{ |
| 284 | reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer |
| 285 | remain: r.decompressed.Len(), |
| 286 | base: -1, // base is unused here |
| 287 | parent: r.readerStack, |
| 288 | header: r.header, |
| 289 | count: r.count, |
| 290 | } |
| 291 | // all of the messages in this set are in the decompressed set just pushed onto the reader |
| 292 | // stack. here we set the parent count to 0 so that when the child set is exhausted, the |
| 293 | // reader will then try to read the header of the next message set |
| 294 | r.readerStack.parent.count = 0 |
| 295 | } |
| 296 | } |
| 297 | remainBefore := r.remain |
| 298 | var length int64 |
| 299 | if err = r.readVarInt(&length); err != nil { |
| 300 | return |
| 301 | } |
| 302 | lengthOfLength := remainBefore - r.remain |
| 303 | var attrs int8 |
| 304 | if err = r.readInt8(&attrs); err != nil { |
| 305 | return |
| 306 | } |
| 307 | var timestampDelta int64 |
| 308 | if err = r.readVarInt(×tampDelta); err != nil { |
| 309 | return |
no test coverage detected