MCPcopy
hub / github.com/segmentio/kafka-go / readMessageV1

Method readMessageV1

message_reader.go:145–250  ·  view source on GitHub ↗
(min int64, key readBytesFunc, val readBytesFunc)

Source from the content-addressed store, hash-verified

143}
144
145func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readBytesFunc) (
146 offset int64, timestamp int64, headers []Header, err error) {
147
148 for r.readerStack != nil {
149 if r.remain == 0 {
150 r.readerStack = r.parent
151 continue
152 }
153 if err = r.readHeader(); err != nil {
154 return
155 }
156 offset = r.header.firstOffset
157 timestamp = r.header.v1.timestamp
158 var codec CompressionCodec
159 if codec, err = r.header.compression(); err != nil {
160 return
161 }
162 if r.debug {
163 r.log("Reading with codec=%T", codec)
164 }
165 if codec != nil {
166 // discard next four bytes...will be -1 to indicate null key
167 if err = r.discardN(4); err != nil {
168 return
169 }
170
171 // read and decompress the contained message set.
172 r.decompressed.Reset()
173 if err = r.readBytesWith(func(br *bufio.Reader, sz int, n int) (remain int, err error) {
174 // x4 as a guess that the average compression ratio is near 75%
175 r.decompressed.Grow(4 * n)
176 limitReader := io.LimitedReader{R: br, N: int64(n)}
177 codecReader := codec.NewReader(&limitReader)
178 _, err = r.decompressed.ReadFrom(codecReader)
179 remain = sz - (n - int(limitReader.N))
180 codecReader.Close()
181 return
182 }); err != nil {
183 return
184 }
185
186 // the compressed message's offset will be equal to the offset of
187 // the last message in the set. within the compressed set, the
188 // offsets will be relative, so we have to scan through them to
189 // get the base offset. for example, if there are four compressed
190 // messages at offsets 10-13, then the container message will have
191 // offset 13 and the contained messages will be 0,1,2,3. the base
192 // offset for the container, then is 13-3=10.
193 if offset, err = extractOffset(offset, r.decompressed.Bytes()); err != nil {
194 return
195 }
196
197 // mark the outer message as being read
198 r.markRead()
199
200 // then push the decompressed bytes onto the stack.
201 r.readerStack = &readerStack{
202 // Allocate a buffer of size 0, which gets capped at 16 bytes

Callers 1

readMessageMethod · 0.95

Calls 14

readHeaderMethod · 0.95
logMethod · 0.95
discardNMethod · 0.95
readBytesWithMethod · 0.95
CloseMethod · 0.95
markReadMethod · 0.95
discardBytesMethod · 0.95
extractOffsetFunction · 0.85
compressionMethod · 0.80
NewReaderMethod · 0.65
BytesMethod · 0.65
LenMethod · 0.65

Tested by

no test coverage detected