MCPcopy
hub / github.com/grafana/tempo / findTraceByID

Function findTraceByID

tempodb/encoding/vparquet4/block_findtracebyid.go:146–279  ·  view source on GitHub ↗
(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int)

Source from the content-addressed store, hash-verified

144}
145
146func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int) (*tempopb.Trace, error) {
147 // traceID column index
148 colIndex, _, maxDef := parquetquery.GetColumnIndexByPath(pf, TraceIDColumnName)
149 if colIndex == -1 {
150 return nil, fmt.Errorf("unable to get index for column: %s", TraceIDColumnName)
151 }
152
153 // If no index then fallback to binary searching the rowgroups.
154 if rowGroup == -1 {
155 var (
156 numRowGroups = len(pf.RowGroups())
157 buf = make(parquet.Row, 1)
158 err error
159 )
160
161 // Cache of row group bounds
162 rowGroupMins := make([]common.ID, numRowGroups+1)
163 // todo: restore using meta min/max id once it works
164 // https://github.com/grafana/tempo/issues/1903
165 rowGroupMins[0] = bytes.Repeat([]byte{0}, 16)
166 rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below
167
168 // Gets the minimum trace ID within the row group. Since the column is sorted
169 // ascending we just read the first value from the first page.
170 getRowGroupMin := func(rgIdx int) (common.ID, error) {
171 minID := rowGroupMins[rgIdx]
172 if len(minID) > 0 {
173 // Already loaded
174 return minID, nil
175 }
176
177 pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages()
178 defer pages.Close()
179
180 page, err := pages.ReadPage()
181 if err != nil {
182 return nil, err
183 }
184 defer parquet.Release(page)
185
186 c, err := page.Values().ReadValues(buf)
187 if err != nil && !errors.Is(err, io.EOF) {
188 return nil, err
189 }
190 if c < 1 {
191 return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx)
192 }
193
194 // Clone ensures that the byte array is disconnected
195 // from the underlying i/o buffers.
196 minID = buf[0].Clone().ByteArray()
197 rowGroupMins[rgIdx] = minID
198 return minID, nil
199 }
200
201 rowGroup, err = binarySearch(numRowGroups, func(rgIdx int) (int, error) {
202 minID, err := getRowGroupMin(rgIdx)
203 if err != nil {

Callers 1

FindTraceByIDMethod · 0.70

Calls 15

CloseMethod · 0.95
NextMethod · 0.95
GetColumnIndexByPathFunction · 0.92
TraceIDToHexStringFunction · 0.92
NewSyncIteratorFunction · 0.92
SyncIteratorOptPredicateFunction · 0.92
NewStringInPredicateFunction · 0.92
CloneMethod · 0.80
binarySearchFunction · 0.70
CloseMethod · 0.65

Tested by

no test coverage detected