| 228 | } |
| 229 | |
| 230 | func (w *BackendWorker) processJobs(ctx context.Context) error { |
| 231 | var ( |
| 232 | resp *tempopb.NextJobResponse |
| 233 | err error |
| 234 | ) |
| 235 | |
| 236 | // Request next job |
| 237 | err = w.callSchedulerWithBackoff(ctx, func(ctx context.Context) error { |
| 238 | var funcErr error |
| 239 | resp, funcErr = w.backendScheduler.Next(ctx, &tempopb.NextJobRequest{ |
| 240 | WorkerId: w.workerID, |
| 241 | }) |
| 242 | if funcErr != nil { |
| 243 | if errStatus, ok := status.FromError(funcErr); ok { |
| 244 | if errStatus.Code() == codes.NotFound { |
| 245 | return errStatus.Err() |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | return fmt.Errorf("error getting next job: %w", funcErr) |
| 250 | } |
| 251 | |
| 252 | return nil |
| 253 | }) |
| 254 | if err != nil { |
| 255 | return fmt.Errorf("failed processing jobs: %w", err) |
| 256 | } |
| 257 | |
| 258 | if resp == nil || resp.JobId == "" { |
| 259 | return fmt.Errorf("no jobs available") |
| 260 | } |
| 261 | |
| 262 | metricWorkerJobsTotal.WithLabelValues().Inc() |
| 263 | |
| 264 | switch resp.Type { |
| 265 | case tempopb.JobType_JOB_TYPE_COMPACTION: |
| 266 | return w.processCompactionJob(ctx, resp) |
| 267 | case tempopb.JobType_JOB_TYPE_RETENTION: |
| 268 | return w.processRetentionJob(ctx, resp) |
| 269 | case tempopb.JobType_JOB_TYPE_REDACTION: |
| 270 | return w.processRedactionJob(ctx, resp) |
| 271 | default: |
| 272 | return fmt.Errorf("unknown job type: %s", resp.Type.String()) |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | func (w *BackendWorker) processCompactionJob(ctx context.Context, resp *tempopb.NextJobResponse) error { |
| 277 | if resp.Detail.Tenant == "" { |