MCPcopy
hub / github.com/segmentio/kafka-go / readFromVersion1

Method readFromVersion1

protocol/record_v1.go:55–150  ·  view source on GitHub ↗
(d *decoder)

Source from the content-addressed store, hash-verified

53}
54
55func (rs *RecordSet) readFromVersion1(d *decoder) error {
56 var records RecordReader
57
58 b := newPageBuffer()
59 defer b.unref()
60
61 attributes, baseOffset, timestamp, key, value, err := readMessage(b, d)
62 if err != nil {
63 return err
64 }
65
66 if compression := Attributes(attributes).Compression(); compression == 0 {
67 records = &message{
68 Record: Record{
69 Offset: baseOffset,
70 Time: makeTime(timestamp),
71 Key: key,
72 Value: value,
73 },
74 }
75 } else {
76 // Can we have a non-nil key when reading a compressed message?
77 if key != nil {
78 key.Close()
79 }
80 if value == nil {
81 records = emptyRecordReader{}
82 } else {
83 defer value.Close()
84
85 codec := compression.Codec()
86 if codec == nil {
87 return Errorf("unsupported compression codec: %d", compression)
88 }
89 decompressor := codec.NewReader(value)
90 defer decompressor.Close()
91
92 b := newPageBuffer()
93 defer b.unref()
94
95 d := &decoder{
96 reader: decompressor,
97 remain: math.MaxInt32,
98 }
99
100 r := &recordReader{
101 records: make([]Record, 0, 32),
102 }
103
104 for !d.done() {
105 _, offset, timestamp, key, value, err := readMessage(b, d)
106 if err != nil {
107 if errors.Is(err, io.ErrUnexpectedEOF) {
108 break
109 }
110 for _, rec := range r.records {
111 closeBytes(rec.Key)
112 closeBytes(rec.Value)

Callers 1

ReadFromMethod · 0.95

Calls 13

CloseMethod · 0.95
doneMethod · 0.95
newPageBufferFunction · 0.85
readMessageFunction · 0.85
AttributesTypeAlias · 0.85
ErrorfFunction · 0.85
closeBytesFunction · 0.85
CompressionMethod · 0.80
CodecMethod · 0.80
makeTimeFunction · 0.70
NewReaderMethod · 0.65
unrefMethod · 0.45

Tested by

no test coverage detected