( ctx context.Context, tenant string, )
| 394 | } |
| 395 | |
| 396 | func (rw *readerWriter) ListBlocks( |
| 397 | ctx context.Context, |
| 398 | tenant string, |
| 399 | ) ([]uuid.UUID, []uuid.UUID, error) { |
| 400 | ctx, span := tracer.Start(ctx, "readerWriter.ListBlocks") |
| 401 | defer span.End() |
| 402 | |
| 403 | var ( |
| 404 | wg = sync.WaitGroup{} |
| 405 | mtx = sync.Mutex{} |
| 406 | bb = blockboundary.CreateBlockBoundaries(rw.cfg.ListBlocksConcurrency) |
| 407 | errChan = make(chan error, len(bb)) |
| 408 | keypath = backend.KeyPathWithPrefix(backend.KeyPath{tenant}, rw.cfg.Prefix) |
| 409 | minID uuid.UUID |
| 410 | maxID uuid.UUID |
| 411 | blockIDs = make([]uuid.UUID, 0, 1000) |
| 412 | compactedBlockIDs = make([]uuid.UUID, 0, 1000) |
| 413 | ) |
| 414 | |
| 415 | prefix := path.Join(keypath...) |
| 416 | if len(prefix) > 0 { |
| 417 | prefix += "/" |
| 418 | } |
| 419 | |
| 420 | for i := 0; i < len(bb)-1; i++ { |
| 421 | minID = uuid.UUID(bb[i]) |
| 422 | maxID = uuid.UUID(bb[i+1]) |
| 423 | |
| 424 | wg.Add(1) |
| 425 | go func(minUUID, maxUUID uuid.UUID) { |
| 426 | defer wg.Done() |
| 427 | |
| 428 | var ( |
| 429 | err error |
| 430 | res minio.ListBucketV2Result |
| 431 | startAfter = prefix + minUUID.String() |
| 432 | ) |
| 433 | |
| 434 | for res.IsTruncated = true; res.IsTruncated; { |
| 435 | if ctx.Err() != nil { |
| 436 | return |
| 437 | } |
| 438 | |
| 439 | res, err = rw.core.ListObjectsV2(rw.cfg.Bucket, prefix, startAfter, res.NextContinuationToken, "", 0) |
| 440 | if err != nil { |
| 441 | errChan <- fmt.Errorf("error finding objects in s3 bucket, bucket: %s: %w", rw.cfg.Bucket, err) |
| 442 | return |
| 443 | } |
| 444 | |
| 445 | for _, c := range res.Contents { |
| 446 | // i.e: <blockID>/meta |
| 447 | parts := strings.Split(strings.TrimPrefix(c.Key, prefix), "/") |
| 448 | if len(parts) != 2 { |
| 449 | continue |
| 450 | } |
| 451 | |
| 452 | switch parts[1] { |
| 453 | case backend.MetaName: |
nothing calls this directly
no test coverage detected