ListBlocks implements backend.Reader
(ctx context.Context, tenant string)
| 210 | |
| 211 | // ListBlocks implements backend.Reader |
| 212 | func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uuid.UUID, error) { |
| 213 | ctx, span := tracer.Start(ctx, "readerWriter.ListBlocks") |
| 214 | defer span.End() |
| 215 | |
| 216 | var ( |
| 217 | wg sync.WaitGroup |
| 218 | mtx sync.Mutex |
| 219 | bb = blockboundary.CreateBlockBoundaries(rw.cfg.ListBlocksConcurrency) |
| 220 | errChan = make(chan error, len(bb)) |
| 221 | keypath = backend.KeyPathWithPrefix(backend.KeyPath{tenant}, rw.cfg.Prefix) |
| 222 | minID uuid.UUID |
| 223 | maxID uuid.UUID |
| 224 | blockIDs = make([]uuid.UUID, 0, 1000) |
| 225 | compactedBlockIDs = make([]uuid.UUID, 0, 1000) |
| 226 | ) |
| 227 | |
| 228 | prefix := path.Join(keypath...) |
| 229 | if len(prefix) > 0 { |
| 230 | prefix += "/" |
| 231 | } |
| 232 | |
| 233 | for i := 0; i < len(bb)-1; i++ { |
| 234 | minID = uuid.UUID(bb[i]) |
| 235 | maxID = uuid.UUID(bb[i+1]) |
| 236 | |
| 237 | wg.Add(1) |
| 238 | go func(minUUID, maxUUID uuid.UUID) { |
| 239 | defer wg.Done() |
| 240 | |
| 241 | var ( |
| 242 | query = &storage.Query{ |
| 243 | Prefix: prefix, |
| 244 | Delimiter: "", |
| 245 | Versions: false, |
| 246 | StartOffset: prefix + minUUID.String(), |
| 247 | } |
| 248 | parts []string |
| 249 | id uuid.UUID |
| 250 | ) |
| 251 | |
| 252 | // If max is global max, then we don't want to set an end offset to |
| 253 | // ensure we reach the end. EndOffset is exclusive. |
| 254 | if maxUUID != backend.GlobalMaxBlockID { |
| 255 | query.EndOffset = prefix + maxUUID.String() |
| 256 | } |
| 257 | |
| 258 | iter := rw.bucket.Objects(ctx, query) |
| 259 | |
| 260 | for { |
| 261 | if ctx.Err() != nil { |
| 262 | return |
| 263 | } |
| 264 | |
| 265 | attrs, err := iter.Next() |
| 266 | if errors.Is(err, iterator.Done) { |
| 267 | return |
| 268 | } |
| 269 | if err != nil { |
nothing calls this directly
no test coverage detected