| 180 | } |
| 181 | |
| 182 | func (s *BackendScheduler) running(ctx context.Context) error { |
| 183 | level.Info(log.Logger).Log("msg", "backend scheduler running") |
| 184 | |
| 185 | maintenanceTicker := time.NewTicker(s.cfg.MaintenanceInterval) |
| 186 | defer maintenanceTicker.Stop() |
| 187 | |
| 188 | backendFlushTicker := time.NewTicker(s.cfg.BackendFlushInterval) |
| 189 | defer backendFlushTicker.Stop() |
| 190 | |
| 191 | var err error |
| 192 | |
| 193 | for { |
| 194 | select { |
| 195 | case <-ctx.Done(): |
| 196 | return nil |
| 197 | case <-maintenanceTicker.C: |
| 198 | s.work.Prune(ctx) |
| 199 | s.checkPendingRescans(ctx) |
| 200 | case <-backendFlushTicker.C: |
| 201 | err = s.flushWorkCacheToBackend(ctx) |
| 202 | metricWorkFlushes.Inc() |
| 203 | if err != nil && !errors.Is(err, context.Canceled) { |
| 204 | metricWorkFlushesFailed.Inc() |
| 205 | level.Error(log.Logger).Log("msg", "failed to flush work cache to backend", "error", err) |
| 206 | } |
| 207 | |
| 208 | } |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | func (s *BackendScheduler) stopping(_ error) error { |
| 213 | err := s.work.FlushToLocal(context.Background(), s.cfg.LocalWorkPath, nil) // flush all shards |