MCPcopy
hub / github.com/segmentio/kafka-go / readMessage

Method readMessage

batch.go:239–304  ·  view source on GitHub ↗
(
	key func(*bufio.Reader, int, int) (int, error),
	val func(*bufio.Reader, int, int) (int, error),
)

Source from the content-addressed store, hash-verified

237}
238
239func (batch *Batch) readMessage(
240 key func(*bufio.Reader, int, int) (int, error),
241 val func(*bufio.Reader, int, int) (int, error),
242) (offset int64, timestamp int64, headers []Header, err error) {
243 if err = batch.err; err != nil {
244 return
245 }
246
247 var lastOffset int64
248 offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
249 switch {
250 case err == nil:
251 batch.offset = offset + 1
252 batch.lastOffset = lastOffset
253 case errors.Is(err, errShortRead):
254 // As an "optimization" kafka truncates the returned response after
255 // producing MaxBytes, which could then cause the code to return
256 // errShortRead.
257 err = batch.msgs.discard()
258 switch {
259 case err != nil:
260 // Since io.EOF is used by the batch to indicate that there is are
261 // no more messages to consume, it is crucial that any io.EOF errors
262 // on the underlying connection are repackaged. Otherwise, the
263 // caller can't tell the difference between a batch that was fully
264 // consumed or a batch whose connection is in an error state.
265 batch.err = dontExpectEOF(err)
266 case batch.msgs.remaining() == 0:
267 // Because we use the adjusted deadline we could end up returning
268 // before the actual deadline occurred. This is necessary otherwise
269 // timing out the connection for real could end up leaving it in an
270 // unpredictable state, which would require closing it.
271 // This design decision was made to maximize the chances of keeping
272 // the connection open, the trade off being to lose precision on the
273 // read deadline management.
274 err = checkTimeoutErr(batch.deadline)
275 batch.err = err
276
277 // Checks the following:
278 // - `batch.err` for a "success" from the previous timeout check
279 // - `batch.msgs.lengthRemain` to ensure that this EOF is not due
280 // to MaxBytes truncation
281 // - `batch.lastOffset` to ensure that the message format contains
282 // `lastOffset`
283 if errors.Is(batch.err, io.EOF) && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 {
284 // Log compaction can create batches that end with compacted
285 // records so the normal strategy that increments the "next"
286 // offset as records are read doesn't work as the compacted
287 // records are "missing" and never get "read".
288 //
289 // In order to reliably reach the next non-compacted offset we
290 // jump past the saved lastOffset.
291 batch.offset = batch.lastOffset + 1
292 }
293 }
294 default:
295 // Since io.EOF is used by the batch to indicate that there is are
296 // no more messages to consume, it is crucial that any io.EOF errors

Callers 2

ReadMethod · 0.95
ReadMessageMethod · 0.95

Calls 4

checkTimeoutErrFunction · 0.85
remainingMethod · 0.80
dontExpectEOFFunction · 0.70
discardMethod · 0.45

Tested by

no test coverage detected