MCPcopy
hub / github.com/grafana/tempo / calculateTimeLag

Method calculateTimeLag

modules/livestore/live_store.go:586–623  ·  modules/livestore/live_store.go::LiveStore.calculateTimeLag

Calculate lag based on parameters populated by PartitionReader Edge cases: - empty partition = no lag - nothing has been fetched yet = indeterminate - we know the watermark but nothing has been consumed yet = indeterminate It takes lagShortcutThreshold to shortcut calculations if the lag is close t

(lagShortcutThreshold int64)

Source from the content-addressed store, hash-verified

584// It takes lagShortcutThreshold to shortcut calculations if the lag is close to the end of the partition.
585// To disable the shortcut, set lagShortcutThreshold to a negative value.
586func (s *LiveStore) calculateTimeLag(lagShortcutThreshold int64) *time.Duration {
587 // reader is nil only before starting() creates it. After stopping(), reader
588 // is a stopped service but not nil, and its atomic fields remain safe to read.
589 if s.reader == nil {
590 level.Debug(s.logger).Log("msg", "Partition reader not initialized")
591 return nil
592 }
593
594 // Use cached high watermark from fetch responses (avoids extra API call)
595 lag := s.reader.lag.Load()
596
597 // If we haven't performed any fetches yet, we can't determine lag
598 if lag < 0 {
599 level.Debug(s.logger).Log("msg", "High watermark not set yet")
600 return nil
601 }
602
603 // Check if we are near end or partition is empty
604 // Arbitrary value picked to shortcut calculations
605 if lagShortcutThreshold >= 0 && lag <= lagShortcutThreshold {
606 level.Debug(s.logger).Log(
607 "msg", "At or close to partition end",
608 "lag", lag)
609 return new(time.Duration(0))
610 }
611
612 nanos := s.lastRecordTimeNanos.Load()
613 if nanos < 0 {
614 level.Debug(s.logger).Log("msg", "No last record yet")
615 // Haven't consumed records - check if offset at end
616 return nil // Not caught up yet, can't determine lag
617 }
618
619 // Potential race condition that can result in negative lag?
620 // Assuming strictly monotonic timestamps in Kafka, can't cause an issue
621 lastRecordTime := time.Unix(0, nanos)
622 return new(time.Since(lastRecordTime))
623}
624
625func (s *LiveStore) consume(ctx context.Context, rs recordIter, now time.Time) (*kadm.Offset, error) {
626 defer s.decoder.Reset()

Callers 2

waitForCatchUpMethod · 0.95
isLaggedMethod · 0.95

Calls 1

LogMethod · 0.65

Tested by

no test coverage detected