(ctx context.Context)
| 511 | } |
| 512 | |
| 513 | func (s *LiveStore) waitForCatchUp(ctx context.Context) error { |
| 514 | // If disabled (ReadinessTargetLag == 0), mark ready immediately |
| 515 | // This preserves backward compatibility |
| 516 | if s.cfg.ReadinessTargetLag == 0 { |
| 517 | level.Info(s.logger).Log("msg", "catch-up waiting disabled (readiness_target_lag=0)") |
| 518 | return nil |
| 519 | } |
| 520 | |
| 521 | startTime := time.Now() |
| 522 | |
| 523 | ticker := time.NewTicker(time.Second) // Check every second |
| 524 | defer ticker.Stop() |
| 525 | |
| 526 | level.Info(s.logger).Log( |
| 527 | "msg", "waiting for Kafka catch-up", |
| 528 | "target_lag", s.cfg.ReadinessTargetLag, |
| 529 | "max_wait", s.cfg.ReadinessMaxWait, |
| 530 | ) |
| 531 | |
| 532 | for { |
| 533 | select { |
| 534 | case <-ctx.Done(): |
| 535 | return ctx.Err() |
| 536 | case <-ticker.C: |
| 537 | elapsed := time.Since(startTime) |
| 538 | |
| 539 | // Check max wait timeout |
| 540 | if elapsed >= s.cfg.ReadinessMaxWait { |
| 541 | level.Warn(s.logger).Log( |
| 542 | "msg", "max catch-up wait exceeded, proceeding anyway", |
| 543 | "elapsed", elapsed, |
| 544 | "max_wait", s.cfg.ReadinessMaxWait, |
| 545 | ) |
| 546 | metricCatchUpDuration.Set(elapsed.Seconds()) |
| 547 | return nil |
| 548 | } |
| 549 | |
| 550 | // Calculate current lag |
| 551 | lag := s.calculateTimeLag(1000) |
| 552 | if lag == nil { |
| 553 | level.Debug(s.logger).Log("msg", "catch-up lag could not be determined, waiting") |
| 554 | continue |
| 555 | } |
| 556 | |
| 557 | level.Debug(s.logger).Log( |
| 558 | "msg", "catch-up progress", |
| 559 | "current_lag", *lag, |
| 560 | "target_lag", s.cfg.ReadinessTargetLag, |
| 561 | "elapsed", elapsed, |
| 562 | ) |
| 563 | |
| 564 | if *lag <= s.cfg.ReadinessTargetLag { |
| 565 | level.Info(s.logger).Log( |
| 566 | "msg", "caught up with Kafka", |
| 567 | "final_lag", *lag, |
| 568 | "target_lag", s.cfg.ReadinessTargetLag, |
| 569 | "elapsed", elapsed, |
| 570 | ) |
no test coverage detected