Calculate lag based on parameters populated by PartitionReader Edge cases: - empty partition = no lag - nothing has been fetched yet = indeterminate - we know the watermark but nothing has been consumed yet = indeterminate It takes lagShortcutThreshold to shortcut calculations if the lag is close t
(lagShortcutThreshold int64)
| 584 | // It takes lagShortcutThreshold to shortcut calculations if the lag is close to the end of the partition. |
| 585 | // To disable the shortcut, set lagShortcutThreshold to a negative value. |
| 586 | func (s *LiveStore) calculateTimeLag(lagShortcutThreshold int64) *time.Duration { |
| 587 | // reader is nil only before starting() creates it. After stopping(), reader |
| 588 | // is a stopped service but not nil, and its atomic fields remain safe to read. |
| 589 | if s.reader == nil { |
| 590 | level.Debug(s.logger).Log("msg", "Partition reader not initialized") |
| 591 | return nil |
| 592 | } |
| 593 | |
| 594 | // Use cached high watermark from fetch responses (avoids extra API call) |
| 595 | lag := s.reader.lag.Load() |
| 596 | |
| 597 | // If we haven't performed any fetches yet, we can't determine lag |
| 598 | if lag < 0 { |
| 599 | level.Debug(s.logger).Log("msg", "High watermark not set yet") |
| 600 | return nil |
| 601 | } |
| 602 | |
| 603 | // Check if we are near end or partition is empty |
| 604 | // Arbitrary value picked to shortcut calculations |
| 605 | if lagShortcutThreshold >= 0 && lag <= lagShortcutThreshold { |
| 606 | level.Debug(s.logger).Log( |
| 607 | "msg", "At or close to partition end", |
| 608 | "lag", lag) |
| 609 | return new(time.Duration(0)) |
| 610 | } |
| 611 | |
| 612 | nanos := s.lastRecordTimeNanos.Load() |
| 613 | if nanos < 0 { |
| 614 | level.Debug(s.logger).Log("msg", "No last record yet") |
| 615 | // Haven't consumed records - check if offset at end |
| 616 | return nil // Not caught up yet, can't determine lag |
| 617 | } |
| 618 | |
| 619 | // Potential race condition that can result in negative lag? |
| 620 | // Assuming strictly monotonic timestamps in Kafka, can't cause an issue |
| 621 | lastRecordTime := time.Unix(0, nanos) |
| 622 | return new(time.Since(lastRecordTime)) |
| 623 | } |
| 624 | |
| 625 | func (s *LiveStore) consume(ctx context.Context, rs recordIter, now time.Time) (*kadm.Offset, error) { |
| 626 | defer s.decoder.Reset() |
no test coverage detected