(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int)
| 137 | } |
| 138 | |
| 139 | func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int) (*tempopb.TraceByIDResponse, error) { |
| 140 | // traceID column index |
| 141 | colIndex, _, maxDef := parquetquery.GetColumnIndexByPath(pf, TraceIDColumnName) |
| 142 | if colIndex == -1 { |
| 143 | return nil, fmt.Errorf("unable to get index for column: %s", TraceIDColumnName) |
| 144 | } |
| 145 | |
| 146 | // If no index then fallback to binary searching the rowgroups. |
| 147 | if rowGroup == -1 { |
| 148 | var ( |
| 149 | numRowGroups = len(pf.RowGroups()) |
| 150 | buf = make(parquet.Row, 1) |
| 151 | err error |
| 152 | ) |
| 153 | |
| 154 | // Cache of row group bounds |
| 155 | rowGroupMins := make([]common.ID, numRowGroups+1) |
| 156 | // todo: restore using meta min/max id once it works |
| 157 | // https://github.com/grafana/tempo/issues/1903 |
| 158 | rowGroupMins[0] = bytes.Repeat([]byte{0}, 16) |
| 159 | rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below |
| 160 | |
| 161 | // Gets the minimum trace ID within the row group. Since the column is sorted |
| 162 | // ascending we just read the first value from the first page. |
| 163 | getRowGroupMin := func(rgIdx int) (common.ID, error) { |
| 164 | minID := rowGroupMins[rgIdx] |
| 165 | if len(minID) > 0 { |
| 166 | // Already loaded |
| 167 | return minID, nil |
| 168 | } |
| 169 | |
| 170 | pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages() |
| 171 | defer pages.Close() |
| 172 | |
| 173 | page, err := pages.ReadPage() |
| 174 | if err != nil { |
| 175 | return nil, err |
| 176 | } |
| 177 | defer parquet.Release(page) |
| 178 | |
| 179 | c, err := page.Values().ReadValues(buf) |
| 180 | if err != nil && !errors.Is(err, io.EOF) { |
| 181 | return nil, err |
| 182 | } |
| 183 | if c < 1 { |
| 184 | return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx) |
| 185 | } |
| 186 | |
| 187 | // Clone ensures that the byte array is disconnected |
| 188 | // from the underlying i/o buffers. |
| 189 | minID = buf[0].Clone().ByteArray() |
| 190 | rowGroupMins[rgIdx] = minID |
| 191 | return minID, nil |
| 192 | } |
| 193 | |
| 194 | rowGroup, err = binarySearch(numRowGroups, func(rgIdx int) (int, error) { |
| 195 | minID, err := getRowGroupMin(rgIdx) |
| 196 | if err != nil { |
no test coverage detected