MCPcopy
hub / github.com/grafana/tempo / iterateBlocks

Method iterateBlocks

modules/livestore/instance_search.go:59–191  ·  view source on GitHub ↗

iterateBlocks provides a way to iterate over all blocks (head, wal, complete) using concurrent processing with bounded concurrency.

(ctx context.Context, reqStart, reqEnd time.Time, fn blockFn)

Source from the content-addressed store, hash-verified

57// iterateBlocks provides a way to iterate over all blocks (head, wal, complete)
58// using concurrent processing with bounded concurrency.
59func (i *instance) iterateBlocks(ctx context.Context, reqStart, reqEnd time.Time, fn blockFn) error {
60 ctx, span := tracer.Start(ctx, "instance.iterateBlocks",
61 oteltrace.WithAttributes(attribute.String("tenant", i.tenantID)))
62 defer span.End()
63
64 snap := i.blocks.Load()
65
66 var anyErr atomic.Error
67 ctx, cancel := context.WithCancel(ctx)
68 defer cancel()
69
70 handleErr := func(err error) {
71 if err == nil {
72 return
73 }
74 cancel()
75
76 // we're not storing errComplete for obvious reasons. context.Canceled is ignored b/c it may be due to the
77 // cancel above which is not an error. if the context is cancelled, by something above this method than the caller
78 // can still detect and return it
79 if errors.Is(err, errComplete) || errors.Is(err, context.Canceled) {
80 return
81 }
82 anyErr.Store(err)
83 }
84
85 // headBlock meta is mutated in place by AppendTrace; use MetaSnapshot
86 // for a stable copy.
87 if snap.headBlock != nil {
88 meta := snap.headBlock.MetaSnapshot()
89 if includeBlock(meta, reqStart, reqEnd) {
90 ctx, span := tracer.Start(ctx, "process.headBlock")
91 span.SetAttributes(attribute.String("blockID", meta.BlockID.String()))
92
93 func() {
94 defer func() {
95 if r := recover(); r != nil {
96 level.Error(i.logger).Log("msg", "panic in iterateBlocks head block", "blockID", meta.BlockID, "panic", r, "stack", string(debug.Stack()))
97 handleErr(fmt.Errorf("processing head block (%s): panic: %v", meta.BlockID, r))
98 }
99 }()
100 if err := fn(ctx, meta, snap.headBlock); err != nil {
101 handleErr(fmt.Errorf("processing head block (%s): %w", meta.BlockID, err))
102 }
103 }()
104 span.End()
105 }
106 }
107
108 if err := anyErr.Load(); err != nil {
109 return err
110 }
111
112 wg := boundedwaitgroup.New(i.Cfg.QueryBlockConcurrency)
113
114 // Process wal blocks
115 for _, b := range snap.walBlocks {
116 if ctx.Err() != nil {

Callers 7

SearchMethod · 0.95
SearchTagsV2Method · 0.95
SearchTagValuesMethod · 0.95
SearchTagValuesV2Method · 0.95
FindByTraceIDMethod · 0.95
QueryRangeMethod · 0.95

Calls 12

NewFunction · 0.92
includeBlockFunction · 0.70
StartMethod · 0.65
StoreMethod · 0.65
MetaSnapshotMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65
BlockMetaMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
WaitMethod · 0.65
StringMethod · 0.45

Tested by 1