MCPcopy
hub / github.com/grafana/tempo / processCompleteOp

Method processCompleteOp

modules/livestore/live_store_background.go:107–160  ·  view source on GitHub ↗

processCompleteOp completes a single block. Returns an error if global loop should exit.

(op *completeOp)

Source from the content-addressed store, hash-verified

105
106// processCompleteOp completes a single block. Returns an error if global loop should exit.
107func (s *LiveStore) processCompleteOp(op *completeOp) error {
108 ctx, span := tracer.Start(s.ctx, "LiveStore.processCompleteOp",
109 oteltrace.WithAttributes(
110 attribute.String("tenant", op.tenantID),
111 attribute.String("blockID", op.blockID.String()),
112 attribute.Int("attempt", op.attempts),
113 ))
114 defer span.End()
115
116 start := time.Now()
117 inst, err := s.getOrCreateInstance(op.tenantID)
118 if err != nil {
119 level.Error(s.logger).Log("msg", "failed to retrieve instance for completion", "tenant", op.tenantID, "err", err)
120 observeFailedOp(op)
121 span.SetStatus(codes.Error, err.Error())
122 span.RecordError(err)
123 return err
124 }
125
126 // If the context is cancelled (shutdown), abandon the completion. The WAL block remains on
127 // disk and will be re-enqueued by reloadBlocks() on next startup.
128 if ctx.Err() != nil {
129 level.Info(s.logger).Log("msg", "abandoning WAL block completion on shutdown, will replay on restart", "tenant", op.tenantID, "block", op.blockID)
130 s.completeQueues.Clear(op)
131 return nil
132 }
133
134 completeBlock, err := inst.completeBlock(ctx, op.blockID)
135 if err != nil {
136 metricCompletionDuration.Observe(time.Since(start).Seconds())
137 s.retryCompleteOp(op, span, "failed to complete block", err)
138 return nil
139 }
140
141 if completeBlock == nil {
142 // completeBlock only returns a block when this call converts a WAL block.
143 // On a retry after lifecycle handling fails, the WAL block may already be
144 // gone while the completed block is still present in inst.completeBlocks.
145 completeBlock = inst.blocks.Load().completeBlocks[op.blockID]
146 }
147
148 if completeBlock != nil {
149 if err := s.completeBlockLifecycle.onCompletedBlock(ctx, op.tenantID, completeBlock); err != nil {
150 metricCompletionDuration.Observe(time.Since(start).Seconds())
151 s.retryCompleteOp(op, span, "failed to apply complete block lifecycle", err)
152 return nil
153 }
154 }
155
156 metricCompletionDuration.Observe(time.Since(start).Seconds())
157 metricBlocksCompleted.Inc()
158 s.completeQueues.Clear(op)
159 return nil
160}
161
162func (s *LiveStore) retryCompleteOp(op *completeOp, span oteltrace.Span, msg string, err error) {
163 level.Error(s.logger).Log("msg", msg, "tenant", op.tenantID, "block", op.blockID, "err", err)

Calls 14

getOrCreateInstanceMethod · 0.95
retryCompleteOpMethod · 0.95
observeFailedOpFunction · 0.85
IntMethod · 0.80
completeBlockMethod · 0.80
StartMethod · 0.65
NowMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65
ClearMethod · 0.65
ObserveMethod · 0.65
onCompletedBlockMethod · 0.65