| 86 | } |
| 87 | |
| 88 | func queryBucket(ctx context.Context, percentage float32, r backend.Reader, c backend.Compactor, tenantID string, traceID common.ID) ([]queryResults, error) { |
| 89 | blockIDs, compactedBlockIDs, err := r.Blocks(context.Background(), tenantID) |
| 90 | if err != nil { |
| 91 | return nil, err |
| 92 | } |
| 93 | |
| 94 | if percentage > 0 { |
| 95 | // shuffle |
| 96 | rand.Shuffle(len(blockIDs), func(i, j int) { blockIDs[i], blockIDs[j] = blockIDs[j], blockIDs[i] }) |
| 97 | |
| 98 | // get the first n% |
| 99 | total := len(blockIDs) |
| 100 | total = int(float32(total) * percentage) |
| 101 | blockIDs = blockIDs[:total] |
| 102 | } |
| 103 | fmt.Println("total blocks to search: ", len(blockIDs)) |
| 104 | |
| 105 | blockIDs = append(blockIDs, compactedBlockIDs...) |
| 106 | |
| 107 | // Load in parallel |
| 108 | wg := boundedwaitgroup.New(100) |
| 109 | resultsCh := make(chan queryResults, len(blockIDs)) |
| 110 | |
| 111 | for blockNum, id := range blockIDs { |
| 112 | wg.Add(1) |
| 113 | |
| 114 | go func(blockNum2 int, id2 uuid.UUID) { |
| 115 | defer wg.Done() |
| 116 | |
| 117 | // search here |
| 118 | q, err := queryBlock(ctx, r, c, blockNum2, id2, tenantID, traceID) |
| 119 | if err != nil { |
| 120 | fmt.Println("Error querying block:", err) |
| 121 | return |
| 122 | } |
| 123 | |
| 124 | if q != nil { |
| 125 | resultsCh <- *q |
| 126 | } |
| 127 | }(blockNum, id) |
| 128 | } |
| 129 | |
| 130 | wg.Wait() |
| 131 | close(resultsCh) |
| 132 | |
| 133 | results := make([]queryResults, 0) |
| 134 | for q := range resultsCh { |
| 135 | results = append(results, q) |
| 136 | } |
| 137 | |
| 138 | return results, nil |
| 139 | } |
| 140 | |
| 141 | func queryBlock(ctx context.Context, r backend.Reader, _ backend.Compactor, blockNum int, id uuid.UUID, tenantID string, traceID common.ID) (*queryResults, error) { |
| 142 | fmt.Print(".") |