MCPcopy
hub / github.com/grafana/tempo / processJobs

Method processJobs

modules/backendworker/backendworker.go:230–274  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

228}
229
230func (w *BackendWorker) processJobs(ctx context.Context) error {
231 var (
232 resp *tempopb.NextJobResponse
233 err error
234 )
235
236 // Request next job
237 err = w.callSchedulerWithBackoff(ctx, func(ctx context.Context) error {
238 var funcErr error
239 resp, funcErr = w.backendScheduler.Next(ctx, &tempopb.NextJobRequest{
240 WorkerId: w.workerID,
241 })
242 if funcErr != nil {
243 if errStatus, ok := status.FromError(funcErr); ok {
244 if errStatus.Code() == codes.NotFound {
245 return errStatus.Err()
246 }
247 }
248
249 return fmt.Errorf("error getting next job: %w", funcErr)
250 }
251
252 return nil
253 })
254 if err != nil {
255 return fmt.Errorf("failed processing jobs: %w", err)
256 }
257
258 if resp == nil || resp.JobId == "" {
259 return fmt.Errorf("no jobs available")
260 }
261
262 metricWorkerJobsTotal.WithLabelValues().Inc()
263
264 switch resp.Type {
265 case tempopb.JobType_JOB_TYPE_COMPACTION:
266 return w.processCompactionJob(ctx, resp)
267 case tempopb.JobType_JOB_TYPE_RETENTION:
268 return w.processRetentionJob(ctx, resp)
269 case tempopb.JobType_JOB_TYPE_REDACTION:
270 return w.processRedactionJob(ctx, resp)
271 default:
272 return fmt.Errorf("unknown job type: %s", resp.Type.String())
273 }
274}
275
276func (w *BackendWorker) processCompactionJob(ctx context.Context, resp *tempopb.NextJobResponse) error {
277 if resp.Detail.Tenant == "" {

Callers 2

runningMethod · 0.95
TestWorkerFunction · 0.80

Calls 7

processCompactionJobMethod · 0.95
processRetentionJobMethod · 0.95
processRedactionJobMethod · 0.95
NextMethod · 0.65
IncMethod · 0.65
StringMethod · 0.45

Tested by 1

TestWorkerFunction · 0.64