MCPcopy
hub / github.com/grafana/tempo / processCompactionJob

Method processCompactionJob

modules/backendworker/backendworker.go:276–327  ·  view source on GitHub ↗
(ctx context.Context, resp *tempopb.NextJobResponse)

Source from the content-addressed store, hash-verified

274}
275
276func (w *BackendWorker) processCompactionJob(ctx context.Context, resp *tempopb.NextJobResponse) error {
277 if resp.Detail.Tenant == "" {
278 metricWorkerBadJobsReceived.WithLabelValues("no_tenant").Inc()
279 return w.failJob(ctx, resp.JobId, "received compaction job with empty tenant")
280 }
281
282 level.Debug(log.Logger).Log("msg", "received job", "job_id", resp.JobId, "tenant", resp.Detail.Tenant)
283
284 blockMetas := w.store.BlockMetas(resp.Detail.Tenant)
285
286 // Collect the metas which match the IDs in the job
287 var sourceMetas []*backend.BlockMeta
288 for _, blockMeta := range blockMetas {
289 for _, blockID := range resp.Detail.Compaction.Input {
290 if blockMeta.BlockID.String() == blockID {
291 sourceMetas = append(sourceMetas, blockMeta)
292 }
293 }
294 }
295
296 // Execute compaction using existing logic
297 newCompacted, err := w.compact(ctx, sourceMetas, resp.Detail.Tenant)
298 if err != nil {
299 return w.failJob(ctx, resp.JobId, fmt.Sprintf("error compacting blocks: %v", err))
300 }
301
302 var newIDs []string
303 for _, blockMeta := range newCompacted {
304 newIDs = append(newIDs, blockMeta.BlockID.String())
305 }
306
307 // Mark job as complete
308 err = w.callSchedulerWithBackoff(ctx, func(ctx context.Context) error {
309 _, err = w.backendScheduler.UpdateJob(ctx, &tempopb.UpdateJobStatusRequest{
310 JobId: resp.JobId,
311 Status: tempopb.JobStatus_JOB_STATUS_SUCCEEDED,
312 Compaction: &tempopb.CompactionDetail{
313 Output: newIDs,
314 },
315 })
316 if err != nil {
317 return fmt.Errorf("failed marking job %q as complete: %w", resp.JobId, err)
318 }
319
320 return nil
321 })
322 if err != nil {
323 return w.failJob(ctx, resp.JobId, fmt.Sprintf("error marking job as complete: %v", err))
324 }
325
326 return nil
327}
328
329func (w *BackendWorker) processRetentionJob(ctx context.Context, resp *tempopb.NextJobResponse) error {
330 tenantID := resp.Detail.Tenant

Callers 1

processJobsMethod · 0.95

Calls 8

failJobMethod · 0.95
compactMethod · 0.95
IncMethod · 0.65
LogMethod · 0.65
BlockMetasMethod · 0.65
UpdateJobMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected