( key func(*bufio.Reader, int, int) (int, error), val func(*bufio.Reader, int, int) (int, error), )
| 237 | } |
| 238 | |
| 239 | func (batch *Batch) readMessage( |
| 240 | key func(*bufio.Reader, int, int) (int, error), |
| 241 | val func(*bufio.Reader, int, int) (int, error), |
| 242 | ) (offset int64, timestamp int64, headers []Header, err error) { |
| 243 | if err = batch.err; err != nil { |
| 244 | return |
| 245 | } |
| 246 | |
| 247 | var lastOffset int64 |
| 248 | offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val) |
| 249 | switch { |
| 250 | case err == nil: |
| 251 | batch.offset = offset + 1 |
| 252 | batch.lastOffset = lastOffset |
| 253 | case errors.Is(err, errShortRead): |
| 254 | // As an "optimization" kafka truncates the returned response after |
| 255 | // producing MaxBytes, which could then cause the code to return |
| 256 | // errShortRead. |
| 257 | err = batch.msgs.discard() |
| 258 | switch { |
| 259 | case err != nil: |
| 260 | // Since io.EOF is used by the batch to indicate that there is are |
| 261 | // no more messages to consume, it is crucial that any io.EOF errors |
| 262 | // on the underlying connection are repackaged. Otherwise, the |
| 263 | // caller can't tell the difference between a batch that was fully |
| 264 | // consumed or a batch whose connection is in an error state. |
| 265 | batch.err = dontExpectEOF(err) |
| 266 | case batch.msgs.remaining() == 0: |
| 267 | // Because we use the adjusted deadline we could end up returning |
| 268 | // before the actual deadline occurred. This is necessary otherwise |
| 269 | // timing out the connection for real could end up leaving it in an |
| 270 | // unpredictable state, which would require closing it. |
| 271 | // This design decision was made to maximize the chances of keeping |
| 272 | // the connection open, the trade off being to lose precision on the |
| 273 | // read deadline management. |
| 274 | err = checkTimeoutErr(batch.deadline) |
| 275 | batch.err = err |
| 276 | |
| 277 | // Checks the following: |
| 278 | // - `batch.err` for a "success" from the previous timeout check |
| 279 | // - `batch.msgs.lengthRemain` to ensure that this EOF is not due |
| 280 | // to MaxBytes truncation |
| 281 | // - `batch.lastOffset` to ensure that the message format contains |
| 282 | // `lastOffset` |
| 283 | if errors.Is(batch.err, io.EOF) && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 { |
| 284 | // Log compaction can create batches that end with compacted |
| 285 | // records so the normal strategy that increments the "next" |
| 286 | // offset as records are read doesn't work as the compacted |
| 287 | // records are "missing" and never get "read". |
| 288 | // |
| 289 | // In order to reliably reach the next non-compacted offset we |
| 290 | // jump past the saved lastOffset. |
| 291 | batch.offset = batch.lastOffset + 1 |
| 292 | } |
| 293 | } |
| 294 | default: |
| 295 | // Since io.EOF is used by the batch to indicate that there is are |
| 296 | // no more messages to consume, it is crucial that any io.EOF errors |
no test coverage detected