MCPcopy
hub / github.com/segmentio/kafka-go / readFromVersion2

Method readFromVersion2

protocol/record_v2.go:10–189  ·  view source on GitHub ↗
(d *decoder)

Source from the content-addressed store, hash-verified

8)
9
10func (rs *RecordSet) readFromVersion2(d *decoder) error {
11 baseOffset := d.readInt64()
12 batchLength := d.readInt32()
13
14 if int(batchLength) > d.remain || d.err != nil {
15 d.discardAll()
16 return nil
17 }
18
19 dec := &decoder{
20 reader: d,
21 remain: int(batchLength),
22 }
23
24 partitionLeaderEpoch := dec.readInt32()
25 magicByte := dec.readInt8()
26 crc := dec.readInt32()
27
28 dec.setCRC(crc32.MakeTable(crc32.Castagnoli))
29
30 attributes := dec.readInt16()
31 lastOffsetDelta := dec.readInt32()
32 firstTimestamp := dec.readInt64()
33 maxTimestamp := dec.readInt64()
34 producerID := dec.readInt64()
35 producerEpoch := dec.readInt16()
36 baseSequence := dec.readInt32()
37 numRecords := dec.readInt32()
38 reader := io.Reader(dec)
39
40 // unused
41 _ = lastOffsetDelta
42 _ = maxTimestamp
43
44 if compression := Attributes(attributes).Compression(); compression != 0 {
45 codec := compression.Codec()
46 if codec == nil {
47 return fmt.Errorf("unsupported compression codec (%d)", compression)
48 }
49 decompressor := codec.NewReader(reader)
50 defer decompressor.Close()
51 reader = decompressor
52 }
53
54 buffer := newPageBuffer()
55 defer buffer.unref()
56
57 _, err := buffer.ReadFrom(reader)
58 if err != nil {
59 return err
60 }
61 if dec.crc32 != uint32(crc) {
62 return fmt.Errorf("crc32 checksum mismatch (computed=%d found=%d)", dec.crc32, uint32(crc))
63 }
64
65 recordsLength := buffer.Len()
66 dec.reader = buffer
67 dec.remain = recordsLength

Callers 1

ReadFromMethod · 0.95

Calls 15

readInt64Method · 0.95
readInt32Method · 0.95
readInt8Method · 0.95
setCRCMethod · 0.95
readInt16Method · 0.95
CloseMethod · 0.95
readVarIntMethod · 0.95
discardMethod · 0.95
readVarStringMethod · 0.95
readVarBytesMethod · 0.95
newPageRefMethod · 0.95
AttributesTypeAlias · 0.85

Tested by

no test coverage detected