MCPcopy
hub / github.com/segmentio/kafka-go / readMessage

Method readMessage

message_reader.go:122–143  ·  view source on GitHub ↗
(min int64, key readBytesFunc, val readBytesFunc)

Source from the content-addressed store, hash-verified

120}
121
122func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readBytesFunc) (
123 offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
124
125 if r.empty {
126 err = RequestTimedOut
127 return
128 }
129 if err = r.readHeader(); err != nil {
130 return
131 }
132 switch r.header.magic {
133 case 0, 1:
134 offset, timestamp, headers, err = r.readMessageV1(min, key, val)
135 // Set an invalid value so that it can be ignored
136 lastOffset = -1
137 case 2:
138 offset, lastOffset, timestamp, headers, err = r.readMessageV2(min, key, val)
139 default:
140 err = r.header.badMagic()
141 }
142 return
143}
144
145func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readBytesFunc) (
146 offset int64, timestamp int64, headers []Header, err error) {

Callers 1

Calls 4

readHeaderMethod · 0.95
readMessageV1Method · 0.95
readMessageV2Method · 0.95
badMagicMethod · 0.80

Tested by 1