(ctx context.Context)
| 355 | } |
| 356 | |
| 357 | func (s *LiveStore) waitForIngestPathReady(ctx context.Context) error { |
| 358 | if !s.cfg.ConsumeFromKafka { |
| 359 | return nil |
| 360 | } |
| 361 | // Wait for catch-up before marking ready (if enabled) |
| 362 | if err := s.waitForCatchUp(ctx); err != nil { |
| 363 | return fmt.Errorf("failed to catch up: %w", err) |
| 364 | } |
| 365 | return nil |
| 366 | } |
| 367 | |
| 368 | func (s *LiveStore) startIngestPath(ctx context.Context) error { |
| 369 | if !s.cfg.ConsumeFromKafka { |
no test coverage detected