iterateBlocks provides a way to iterate over all blocks (head, wal, complete) using concurrent processing with bounded concurrency.
(ctx context.Context, reqStart, reqEnd time.Time, fn blockFn)
| 57 | // iterateBlocks provides a way to iterate over all blocks (head, wal, complete) |
| 58 | // using concurrent processing with bounded concurrency. |
| 59 | func (i *instance) iterateBlocks(ctx context.Context, reqStart, reqEnd time.Time, fn blockFn) error { |
| 60 | ctx, span := tracer.Start(ctx, "instance.iterateBlocks", |
| 61 | oteltrace.WithAttributes(attribute.String("tenant", i.tenantID))) |
| 62 | defer span.End() |
| 63 | |
| 64 | snap := i.blocks.Load() |
| 65 | |
| 66 | var anyErr atomic.Error |
| 67 | ctx, cancel := context.WithCancel(ctx) |
| 68 | defer cancel() |
| 69 | |
| 70 | handleErr := func(err error) { |
| 71 | if err == nil { |
| 72 | return |
| 73 | } |
| 74 | cancel() |
| 75 | |
| 76 | // we're not storing errComplete for obvious reasons. context.Canceled is ignored b/c it may be due to the |
| 77 | // cancel above which is not an error. if the context is cancelled, by something above this method than the caller |
| 78 | // can still detect and return it |
| 79 | if errors.Is(err, errComplete) || errors.Is(err, context.Canceled) { |
| 80 | return |
| 81 | } |
| 82 | anyErr.Store(err) |
| 83 | } |
| 84 | |
| 85 | // headBlock meta is mutated in place by AppendTrace; use MetaSnapshot |
| 86 | // for a stable copy. |
| 87 | if snap.headBlock != nil { |
| 88 | meta := snap.headBlock.MetaSnapshot() |
| 89 | if includeBlock(meta, reqStart, reqEnd) { |
| 90 | ctx, span := tracer.Start(ctx, "process.headBlock") |
| 91 | span.SetAttributes(attribute.String("blockID", meta.BlockID.String())) |
| 92 | |
| 93 | func() { |
| 94 | defer func() { |
| 95 | if r := recover(); r != nil { |
| 96 | level.Error(i.logger).Log("msg", "panic in iterateBlocks head block", "blockID", meta.BlockID, "panic", r, "stack", string(debug.Stack())) |
| 97 | handleErr(fmt.Errorf("processing head block (%s): panic: %v", meta.BlockID, r)) |
| 98 | } |
| 99 | }() |
| 100 | if err := fn(ctx, meta, snap.headBlock); err != nil { |
| 101 | handleErr(fmt.Errorf("processing head block (%s): %w", meta.BlockID, err)) |
| 102 | } |
| 103 | }() |
| 104 | span.End() |
| 105 | } |
| 106 | } |
| 107 | |
| 108 | if err := anyErr.Load(); err != nil { |
| 109 | return err |
| 110 | } |
| 111 | |
| 112 | wg := boundedwaitgroup.New(i.Cfg.QueryBlockConcurrency) |
| 113 | |
| 114 | // Process wal blocks |
| 115 | for _, b := range snap.walBlocks { |
| 116 | if ctx.Err() != nil { |