(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int)
| 144 | } |
| 145 | |
| 146 | func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int) (*tempopb.Trace, error) { |
| 147 | // traceID column index |
| 148 | colIndex, _, maxDef := parquetquery.GetColumnIndexByPath(pf, TraceIDColumnName) |
| 149 | if colIndex == -1 { |
| 150 | return nil, fmt.Errorf("unable to get index for column: %s", TraceIDColumnName) |
| 151 | } |
| 152 | |
| 153 | // If no index then fallback to binary searching the rowgroups. |
| 154 | if rowGroup == -1 { |
| 155 | var ( |
| 156 | numRowGroups = len(pf.RowGroups()) |
| 157 | buf = make(parquet.Row, 1) |
| 158 | err error |
| 159 | ) |
| 160 | |
| 161 | // Cache of row group bounds |
| 162 | rowGroupMins := make([]common.ID, numRowGroups+1) |
| 163 | // todo: restore using meta min/max id once it works |
| 164 | // https://github.com/grafana/tempo/issues/1903 |
| 165 | rowGroupMins[0] = bytes.Repeat([]byte{0}, 16) |
| 166 | rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below |
| 167 | |
| 168 | // Gets the minimum trace ID within the row group. Since the column is sorted |
| 169 | // ascending we just read the first value from the first page. |
| 170 | getRowGroupMin := func(rgIdx int) (common.ID, error) { |
| 171 | minID := rowGroupMins[rgIdx] |
| 172 | if len(minID) > 0 { |
| 173 | // Already loaded |
| 174 | return minID, nil |
| 175 | } |
| 176 | |
| 177 | pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages() |
| 178 | defer pages.Close() |
| 179 | |
| 180 | page, err := pages.ReadPage() |
| 181 | if err != nil { |
| 182 | return nil, err |
| 183 | } |
| 184 | defer parquet.Release(page) |
| 185 | |
| 186 | c, err := page.Values().ReadValues(buf) |
| 187 | if err != nil && !errors.Is(err, io.EOF) { |
| 188 | return nil, err |
| 189 | } |
| 190 | if c < 1 { |
| 191 | return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx) |
| 192 | } |
| 193 | |
| 194 | // Clone ensures that the byte array is disconnected |
| 195 | // from the underlying i/o buffers. |
| 196 | minID = buf[0].Clone().ByteArray() |
| 197 | rowGroupMins[rgIdx] = minID |
| 198 | return minID, nil |
| 199 | } |
| 200 | |
| 201 | rowGroup, err = binarySearch(numRowGroups, func(rgIdx int) (int, error) { |
| 202 | minID, err := getRowGroupMin(rgIdx) |
| 203 | if err != nil { |
no test coverage detected