| 122 | } |
| 123 | |
| 124 | func (s *BackendScheduler) starting(ctx context.Context) error { |
| 125 | level.Info(log.Logger).Log("msg", "backend scheduler starting") |
| 126 | |
| 127 | if s.cfg.Poll { |
| 128 | s.store.EnablePolling(ctx, blocklist.OwnsNothingSharder, true) |
| 129 | } |
| 130 | |
| 131 | err := s.loadWorkCache(ctx) |
| 132 | if err != nil && !errors.Is(err, backend.ErrDoesNotExist) { |
| 133 | return fmt.Errorf("failed to load work cache: %w", err) |
| 134 | } |
| 135 | |
| 136 | // Load the batch manifest (best-effort; missing file means no active redaction batches). |
| 137 | if err := s.work.LoadBatchesFromLocal(ctx, s.cfg.LocalWorkPath); err != nil { |
| 138 | level.Info(log.Logger).Log("msg", "no batch manifest found at startup", "err", err) |
| 139 | } |
| 140 | |
| 141 | wg := sync.WaitGroup{} |
| 142 | |
| 143 | for i := range s.providers { |
| 144 | s.providers[i].jobs = s.providers[i].provider.Start(ctx) |
| 145 | |
| 146 | wg.Add(1) |
| 147 | // Start a goroutine to forward jobs from each provider to the merged channel |
| 148 | go func(jobs <-chan *work.Job) { |
| 149 | defer wg.Done() |
| 150 | |
| 151 | var job *work.Job |
| 152 | |
| 153 | for { |
| 154 | select { |
| 155 | case job = <-jobs: |
| 156 | case <-ctx.Done(): |
| 157 | level.Info(log.Logger).Log("msg", "stopping provider", "provider", i) |
| 158 | return |
| 159 | } |
| 160 | |
| 161 | select { |
| 162 | case s.mergedJobs <- job: |
| 163 | metricProviderJobsMerged.WithLabelValues(strconv.Itoa(i)).Inc() |
| 164 | case <-ctx.Done(): |
| 165 | level.Info(log.Logger).Log("msg", "stopping provider", "provider", i) |
| 166 | return |
| 167 | } |
| 168 | } |
| 169 | }(s.providers[i].jobs) |
| 170 | } |
| 171 | |
| 172 | // Start a goroutine to close the merged channel when all providers are done |
| 173 | go func() { |
| 174 | wg.Wait() |
| 175 | level.Info(log.Logger).Log("msg", "all providers stopped") |
| 176 | close(s.mergedJobs) |
| 177 | }() |
| 178 | |
| 179 | return nil |
| 180 | } |
| 181 | |