processCompleteOp completes a single block. Returns an error if global loop should exit.
(op *completeOp)
| 105 | |
| 106 | // processCompleteOp completes a single block. Returns an error if global loop should exit. |
| 107 | func (s *LiveStore) processCompleteOp(op *completeOp) error { |
| 108 | ctx, span := tracer.Start(s.ctx, "LiveStore.processCompleteOp", |
| 109 | oteltrace.WithAttributes( |
| 110 | attribute.String("tenant", op.tenantID), |
| 111 | attribute.String("blockID", op.blockID.String()), |
| 112 | attribute.Int("attempt", op.attempts), |
| 113 | )) |
| 114 | defer span.End() |
| 115 | |
| 116 | start := time.Now() |
| 117 | inst, err := s.getOrCreateInstance(op.tenantID) |
| 118 | if err != nil { |
| 119 | level.Error(s.logger).Log("msg", "failed to retrieve instance for completion", "tenant", op.tenantID, "err", err) |
| 120 | observeFailedOp(op) |
| 121 | span.SetStatus(codes.Error, err.Error()) |
| 122 | span.RecordError(err) |
| 123 | return err |
| 124 | } |
| 125 | |
| 126 | // If the context is cancelled (shutdown), abandon the completion. The WAL block remains on |
| 127 | // disk and will be re-enqueued by reloadBlocks() on next startup. |
| 128 | if ctx.Err() != nil { |
| 129 | level.Info(s.logger).Log("msg", "abandoning WAL block completion on shutdown, will replay on restart", "tenant", op.tenantID, "block", op.blockID) |
| 130 | s.completeQueues.Clear(op) |
| 131 | return nil |
| 132 | } |
| 133 | |
| 134 | completeBlock, err := inst.completeBlock(ctx, op.blockID) |
| 135 | if err != nil { |
| 136 | metricCompletionDuration.Observe(time.Since(start).Seconds()) |
| 137 | s.retryCompleteOp(op, span, "failed to complete block", err) |
| 138 | return nil |
| 139 | } |
| 140 | |
| 141 | if completeBlock == nil { |
| 142 | // completeBlock only returns a block when this call converts a WAL block. |
| 143 | // On a retry after lifecycle handling fails, the WAL block may already be |
| 144 | // gone while the completed block is still present in inst.completeBlocks. |
| 145 | completeBlock = inst.blocks.Load().completeBlocks[op.blockID] |
| 146 | } |
| 147 | |
| 148 | if completeBlock != nil { |
| 149 | if err := s.completeBlockLifecycle.onCompletedBlock(ctx, op.tenantID, completeBlock); err != nil { |
| 150 | metricCompletionDuration.Observe(time.Since(start).Seconds()) |
| 151 | s.retryCompleteOp(op, span, "failed to apply complete block lifecycle", err) |
| 152 | return nil |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | metricCompletionDuration.Observe(time.Since(start).Seconds()) |
| 157 | metricBlocksCompleted.Inc() |
| 158 | s.completeQueues.Clear(op) |
| 159 | return nil |
| 160 | } |
| 161 | |
| 162 | func (s *LiveStore) retryCompleteOp(op *completeOp, span oteltrace.Span, msg string, err error) { |
| 163 | level.Error(s.logger).Log("msg", msg, "tenant", op.tenantID, "block", op.blockID, "err", err) |