buildBackendRequests returns a slice of requests that cover all blocks in the store that are covered by start/end.
(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, firstShardIdx int, blockIter func(shardIterFn, jobIterFn), reqCh chan<- pipeline.Request, errFn func(error))
| 299 | // buildBackendRequests returns a slice of requests that cover all blocks in the store |
| 300 | // that are covered by start/end. |
| 301 | func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, firstShardIdx int, blockIter func(shardIterFn, jobIterFn), reqCh chan<- pipeline.Request, errFn func(error)) { |
| 302 | defer close(reqCh) |
| 303 | |
| 304 | queryHash := hashForSearchRequest(searchReq) |
| 305 | colsToJSON := api.NewDedicatedColumnsToJSON() |
| 306 | |
| 307 | blockIter(nil, func(m *backend.BlockMeta, shard, startPage, pages int) { |
| 308 | blockID := m.BlockID.String() |
| 309 | |
| 310 | dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns) |
| 311 | if err != nil { |
| 312 | errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err)) |
| 313 | return |
| 314 | } |
| 315 | |
| 316 | pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) { |
| 317 | r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{ |
| 318 | SearchReq: searchReq, |
| 319 | BlockID: blockID, |
| 320 | StartPage: uint32(startPage), |
| 321 | PagesToSearch: uint32(pages), |
| 322 | IndexPageSize: m.IndexPageSize, |
| 323 | TotalRecords: m.TotalRecords, |
| 324 | Version: m.Version, |
| 325 | Size_: m.Size_, |
| 326 | FooterSize: m.FooterSize, |
| 327 | // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json |
| 328 | }, dedColsJSON) |
| 329 | |
| 330 | return r, err |
| 331 | }) |
| 332 | if err != nil { |
| 333 | errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err)) |
| 334 | return |
| 335 | } |
| 336 | |
| 337 | startTime := time.Unix(int64(searchReq.Start), 0) |
| 338 | endTime := time.Unix(int64(searchReq.End), 0) |
| 339 | key := searchJobCacheKey(tenantID, queryHash, startTime, endTime, m, startPage, pages) |
| 340 | pipelineR.SetCacheKey(key) |
| 341 | pipelineR.SetResponseData(firstShardIdx + shard) |
| 342 | |
| 343 | select { |
| 344 | case reqCh <- pipelineR: |
| 345 | case <-ctx.Done(): |
| 346 | // ignore the error if there is one. it will be handled elsewhere |
| 347 | return |
| 348 | } |
| 349 | }) |
| 350 | } |
| 351 | |
| 352 | // hashForSearchRequest returns a uint64 hash of the query. if the query is invalid it returns a 0 hash. |
| 353 | // before hashing the query is forced into a canonical form so equivalent queries will hash to the same value. |