blocksWithAnyTraceID returns all blocks that contain any of the trace IDs. It is enough to know if a block contains one of the trace IDs since we will open each block and skip any of the trace IDs which are passed into the command.
(ctx context.Context, r backend.Reader, logger log.Logger, tenantID string, traceIDs ...common.ID)
| 179 | // open each block and skip any of the trace IDs which are passed into the |
| 180 | // command. |
| 181 | func (cmd *dropTracesCmd) blocksWithAnyTraceID(ctx context.Context, r backend.Reader, logger log.Logger, tenantID string, traceIDs ...common.ID) ([]*backend.BlockMeta, error) { |
| 182 | blockIDs, _, err := r.Blocks(ctx, tenantID) |
| 183 | if err != nil { |
| 184 | return nil, err |
| 185 | } |
| 186 | |
| 187 | // Load in parallel |
| 188 | wg := boundedwaitgroup.New(100) |
| 189 | resultsCh := make(chan *backend.BlockMeta, len(blockIDs)) |
| 190 | |
| 191 | for blockNum, id := range blockIDs { |
| 192 | wg.Add(1) |
| 193 | |
| 194 | go func(blockNum2 int, id2 uuid.UUID) { |
| 195 | defer wg.Done() |
| 196 | |
| 197 | // search here |
| 198 | meta, err := isInBlock(ctx, r, cmd.Background, blockNum2, id2, tenantID, traceIDs...) |
| 199 | if err != nil { |
| 200 | level.Error(logger).Log("msg", "error querying block", "block", id2, "err", err) |
| 201 | return |
| 202 | } |
| 203 | |
| 204 | if meta != nil { |
| 205 | resultsCh <- meta |
| 206 | } |
| 207 | }(blockNum, id) |
| 208 | } |
| 209 | |
| 210 | wg.Wait() |
| 211 | close(resultsCh) |
| 212 | |
| 213 | results := make([]*backend.BlockMeta, 0, len(resultsCh)) |
| 214 | for q := range resultsCh { |
| 215 | results = append(results, q) |
| 216 | } |
| 217 | |
| 218 | return results, nil |
| 219 | } |
| 220 | |
| 221 | func isInBlock(ctx context.Context, r backend.Reader, background bool, blockNum int, id uuid.UUID, tenantID string, traceIDs ...common.ID) (*backend.BlockMeta, error) { |
| 222 | if !background { |