MCPcopy
hub / github.com/grafana/tempo / waitForCatchUp

Method waitForCatchUp

modules/livestore/live_store.go:513–576  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

511}
512
513func (s *LiveStore) waitForCatchUp(ctx context.Context) error {
514 // If disabled (ReadinessTargetLag == 0), mark ready immediately
515 // This preserves backward compatibility
516 if s.cfg.ReadinessTargetLag == 0 {
517 level.Info(s.logger).Log("msg", "catch-up waiting disabled (readiness_target_lag=0)")
518 return nil
519 }
520
521 startTime := time.Now()
522
523 ticker := time.NewTicker(time.Second) // Check every second
524 defer ticker.Stop()
525
526 level.Info(s.logger).Log(
527 "msg", "waiting for Kafka catch-up",
528 "target_lag", s.cfg.ReadinessTargetLag,
529 "max_wait", s.cfg.ReadinessMaxWait,
530 )
531
532 for {
533 select {
534 case <-ctx.Done():
535 return ctx.Err()
536 case <-ticker.C:
537 elapsed := time.Since(startTime)
538
539 // Check max wait timeout
540 if elapsed >= s.cfg.ReadinessMaxWait {
541 level.Warn(s.logger).Log(
542 "msg", "max catch-up wait exceeded, proceeding anyway",
543 "elapsed", elapsed,
544 "max_wait", s.cfg.ReadinessMaxWait,
545 )
546 metricCatchUpDuration.Set(elapsed.Seconds())
547 return nil
548 }
549
550 // Calculate current lag
551 lag := s.calculateTimeLag(1000)
552 if lag == nil {
553 level.Debug(s.logger).Log("msg", "catch-up lag could not be determined, waiting")
554 continue
555 }
556
557 level.Debug(s.logger).Log(
558 "msg", "catch-up progress",
559 "current_lag", *lag,
560 "target_lag", s.cfg.ReadinessTargetLag,
561 "elapsed", elapsed,
562 )
563
564 if *lag <= s.cfg.ReadinessTargetLag {
565 level.Info(s.logger).Log(
566 "msg", "caught up with Kafka",
567 "final_lag", *lag,
568 "target_lag", s.cfg.ReadinessTargetLag,
569 "elapsed", elapsed,
570 )

Callers 1

Calls 6

calculateTimeLagMethod · 0.95
LogMethod · 0.65
NowMethod · 0.65
StopMethod · 0.65
DoneMethod · 0.65
SetMethod · 0.65

Tested by

no test coverage detected