| 390 | } |
| 391 | |
| 392 | func (p *Poller) pollUnknown( |
| 393 | ctx context.Context, |
| 394 | unknownBlocks map[uuid.UUID]bool, |
| 395 | tenantID string, |
| 396 | ) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) { |
| 397 | derivedCtx, span := tracer.Start(ctx, "pollUnknown", trace.WithAttributes( |
| 398 | attribute.Int("unknownBlockIDs", len(unknownBlocks)), |
| 399 | )) |
| 400 | defer span.End() |
| 401 | |
| 402 | var ( |
| 403 | err error |
| 404 | errs []error |
| 405 | mtx sync.Mutex |
| 406 | bg = boundedwaitgroup.New(p.cfg.PollConcurrency) |
| 407 | newBlockList = make([]*backend.BlockMeta, 0, len(unknownBlocks)) |
| 408 | newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(unknownBlocks)) |
| 409 | ) |
| 410 | |
| 411 | for blockID, compacted := range unknownBlocks { |
| 412 | // Avoid polling if we've already encountered an error |
| 413 | mtx.Lock() |
| 414 | if len(errs) > 0 { |
| 415 | mtx.Unlock() |
| 416 | break |
| 417 | } |
| 418 | mtx.Unlock() |
| 419 | |
| 420 | bg.Add(1) |
| 421 | go func(id uuid.UUID, compacted bool) { |
| 422 | defer bg.Done() |
| 423 | |
| 424 | if p.cfg.PollJitterMs > 0 { |
| 425 | time.Sleep(time.Duration(rand.Intn(p.cfg.PollJitterMs)) * time.Millisecond) |
| 426 | } |
| 427 | |
| 428 | m, cm, pollBlockErr := p.pollBlock(derivedCtx, tenantID, id, compacted) |
| 429 | mtx.Lock() |
| 430 | defer mtx.Unlock() |
| 431 | if m != nil { |
| 432 | newBlockList = append(newBlockList, m) |
| 433 | return |
| 434 | } |
| 435 | |
| 436 | if cm != nil { |
| 437 | newCompactedBlocklist = append(newCompactedBlocklist, cm) |
| 438 | return |
| 439 | } |
| 440 | |
| 441 | if pollBlockErr != nil { |
| 442 | errs = append(errs, pollBlockErr) |
| 443 | } |
| 444 | }(blockID, compacted) |
| 445 | } |
| 446 | |
| 447 | bg.Wait() |
| 448 | |
| 449 | if len(errs) > 0 { |