MCPcopy
hub / github.com/grafana/tempo / processRetentionJob

Method processRetentionJob

modules/backendworker/backendworker.go:329–358  ·  view source on GitHub ↗
(ctx context.Context, resp *tempopb.NextJobResponse)

Source from the content-addressed store, hash-verified

327}
328
329func (w *BackendWorker) processRetentionJob(ctx context.Context, resp *tempopb.NextJobResponse) error {
330 tenantID := resp.Detail.Tenant
331 level.Debug(log.Logger).Log("msg", "received retention job", "job_id", resp.JobId, "tenant", tenantID)
332
333 // Per-tenant path (new behaviour): run retention for only the specified tenant.
334 // Fallback path (rollout compatibility): if tenant is empty this is a legacy
335 // global job emitted by an older scheduler binary; retain all tenants as before.
336 if tenantID != "" {
337 w.store.RetainTenantWithConfig(ctx, tenantID, &w.cfg.Compactor, ownsEverythingSharder{}, w)
338 } else {
339 w.store.RetainWithConfig(ctx, &w.cfg.Compactor, ownsEverythingSharder{}, w)
340 }
341
342 err := w.callSchedulerWithBackoff(ctx, func(ctx context.Context) error {
343 _, err := w.backendScheduler.UpdateJob(ctx, &tempopb.UpdateJobStatusRequest{
344 JobId: resp.JobId,
345 Status: tempopb.JobStatus_JOB_STATUS_SUCCEEDED,
346 })
347 if err != nil {
348 return fmt.Errorf("failed marking job %q as complete: %w", resp.JobId, err)
349 }
350
351 return nil
352 })
353 if err != nil {
354 return w.failJob(ctx, resp.JobId, fmt.Sprintf("error marking job as complete: %v", err))
355 }
356
357 return nil
358}
359
360func (w *BackendWorker) processRedactionJob(ctx context.Context, resp *tempopb.NextJobResponse) error {
361 tenantID := resp.Detail.Tenant

Callers 1

processJobsMethod · 0.95

Calls 6

failJobMethod · 0.95
LogMethod · 0.65
RetainWithConfigMethod · 0.65
UpdateJobMethod · 0.65

Tested by

no test coverage detected