MCPcopy
hub / github.com/segmentio/kafka-go / readMessageV2

Method readMessageV2

message_reader.go:252–339  ·  view source on GitHub ↗
(_ int64, key readBytesFunc, val readBytesFunc)

Source from the content-addressed store, hash-verified

250}
251
252func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
253 offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
254 if err = r.readHeader(); err != nil {
255 return
256 }
257 if r.count == int(r.header.v2.count) { // first time reading this set, so check for compression headers.
258 var codec CompressionCodec
259 if codec, err = r.header.compression(); err != nil {
260 return
261 }
262 if codec != nil {
263 batchRemain := int(r.header.length - 49) // TODO: document this magic number
264 if batchRemain > r.remain {
265 err = errShortRead
266 return
267 }
268 if batchRemain < 0 {
269 err = fmt.Errorf("batch remain < 0 (%d)", batchRemain)
270 return
271 }
272 r.decompressed.Reset()
273 // x4 as a guess that the average compression ratio is near 75%
274 r.decompressed.Grow(4 * batchRemain)
275 limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
276 codecReader := codec.NewReader(&limitReader)
277 _, err = r.decompressed.ReadFrom(codecReader)
278 codecReader.Close()
279 if err != nil {
280 return
281 }
282 r.remain -= batchRemain - int(limitReader.N)
283 r.readerStack = &readerStack{
284 reader: bufio.NewReaderSize(r.decompressed, 0), // the new stack reads from the decompressed buffer
285 remain: r.decompressed.Len(),
286 base: -1, // base is unused here
287 parent: r.readerStack,
288 header: r.header,
289 count: r.count,
290 }
291 // all of the messages in this set are in the decompressed set just pushed onto the reader
292 // stack. here we set the parent count to 0 so that when the child set is exhausted, the
293 // reader will then try to read the header of the next message set
294 r.readerStack.parent.count = 0
295 }
296 }
297 remainBefore := r.remain
298 var length int64
299 if err = r.readVarInt(&length); err != nil {
300 return
301 }
302 lengthOfLength := remainBefore - r.remain
303 var attrs int8
304 if err = r.readInt8(&attrs); err != nil {
305 return
306 }
307 var timestampDelta int64
308 if err = r.readVarInt(&timestampDelta); err != nil {
309 return

Callers 1

readMessageMethod · 0.95

Calls 12

readHeaderMethod · 0.95
CloseMethod · 0.95
readVarIntMethod · 0.95
readInt8Method · 0.95
runFuncMethod · 0.95
readMessageHeaderMethod · 0.95
markReadMethod · 0.95
compressionMethod · 0.80
NewReaderMethod · 0.65
LenMethod · 0.65
ResetMethod · 0.45
ReadFromMethod · 0.45

Tested by

no test coverage detected