(ctx context.Context)
| 181 | } |
| 182 | |
| 183 | func (w *BackendWorker) running(ctx context.Context) error { |
| 184 | level.Info(log.Logger).Log("msg", "backend worker running") |
| 185 | |
| 186 | b := backoff.New(ctx, w.cfg.Backoff) |
| 187 | |
| 188 | jobCtx := ctx |
| 189 | if w.cfg.FinishOnShutdownTimeout > 0 { |
| 190 | var jobsCancel context.CancelFunc |
| 191 | jobCtx, jobsCancel = createShutdownContext(ctx, w.cfg.FinishOnShutdownTimeout) |
| 192 | defer jobsCancel() |
| 193 | } |
| 194 | |
| 195 | if w.subservices != nil { |
| 196 | for { |
| 197 | select { |
| 198 | case <-ctx.Done(): |
| 199 | return nil |
| 200 | case err := <-w.subservicesWatcher.Chan(): |
| 201 | return fmt.Errorf("worker subservices failed: %w", err) |
| 202 | default: |
| 203 | if err := w.processJobs(jobCtx); err != nil { |
| 204 | level.Error(log.Logger).Log("msg", "error processing jobs", "err", err, "backoff", b.NextDelay()) |
| 205 | b.Wait() |
| 206 | continue |
| 207 | } |
| 208 | |
| 209 | b.Reset() |
| 210 | } |
| 211 | } |
| 212 | } else { |
| 213 | for { |
| 214 | select { |
| 215 | case <-ctx.Done(): |
| 216 | return nil |
| 217 | default: |
| 218 | if err := w.processJobs(jobCtx); err != nil { |
| 219 | level.Error(log.Logger).Log("msg", "error processing jobs", "err", err, "backoff", b.NextDelay()) |
| 220 | b.Wait() |
| 221 | continue |
| 222 | } |
| 223 | |
| 224 | b.Reset() |
| 225 | } |
| 226 | } |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | func (w *BackendWorker) processJobs(ctx context.Context) error { |
| 231 | var ( |
nothing calls this directly
no test coverage detected