MCPcopy
hub / github.com/grafana/tempo / running

Method running

modules/blockbuilder/blockbuilder.go:225–258  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

223}
224
225func (b *BlockBuilder) running(ctx context.Context) error {
226 defer close(b.consumeStopped)
227 for {
228 // Create a detached context for consume
229 // This is so that when the parent context is canceled and the block builder is stopping,
230 // we still finish the current consumption and flush of blocks. That is preferred than
231 // to starting over after a restart.
232 consumeCtx, cancel := context.WithCancel(context.Background())
233
234 waitTime, err := b.consume(consumeCtx)
235 cancel() // Always cancel the context after consume completes
236
237 if err != nil {
238 level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)
239 }
240
241 // Always check for cancellation before going to next cycle.
242 // There are cases like when the queue is lagged, that waitTime could be zero.
243 // In this case it's non-deterministic which select statement will be executed below,
244 // so we do a specific check here first.
245 if ctx.Err() != nil {
246 // Parent context canceled, return
247 return nil
248 }
249
250 // Now wait for next cycle or cancellation.
251 select {
252 case <-time.After(waitTime): // Continue with next cycle
253 case <-ctx.Done():
254 // Parent context canceled, return
255 return nil
256 }
257 }
258}
259
260// It consumes records for all the asigneed partitions, priorizing the ones with more lag. It keeps consuming until
261// all the partitions lag is less than the cycle duration. When that happen it returns time to wait before another consuming cycle, based on the last record timestamp

Callers

nothing calls this directly

Calls 4

consumeMethod · 0.95
LogMethod · 0.65
ErrorMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected