shouldForceFromLookback decides whether to re-read the Kafka lookback to rebuild query state. Skipped when local data exists, or when the partition is Inactive (prior pod already drained).
(ctx context.Context)
| 375 | // shouldForceFromLookback decides whether to re-read the Kafka lookback to rebuild query state. |
| 376 | // Skipped when local data exists, or when the partition is Inactive (prior pod already drained). |
| 377 | func (s *LiveStore) shouldForceFromLookback(ctx context.Context) bool { |
| 378 | if len(s.getInstances()) > 0 { |
| 379 | return false |
| 380 | } |
| 381 | state, _, err := s.ingestPartitionLifecycler.GetPartitionState(ctx) |
| 382 | if err != nil { |
| 383 | level.Warn(s.logger).Log("msg", "failed to read partition state, defaulting to lookback replay", "err", err) |
| 384 | return true |
| 385 | } |
| 386 | if state == ring.PartitionInactive { |
| 387 | level.Info(s.logger).Log("msg", "skipping lookback replay because partition is Inactive") |
| 388 | return false |
| 389 | } |
| 390 | level.Info(s.logger).Log("msg", "no local data found after reload, will force reading from lookback period") |
| 391 | return true |
| 392 | } |
| 393 | |
| 394 | func (s *LiveStore) startKafkaIngestPath(ctx context.Context) error { |
| 395 | forceFromLookback := s.shouldForceFromLookback(ctx) |