MCPcopy
hub / github.com/grafana/tempo / buildBackendRequests

Function buildBackendRequests

modules/frontend/search_sharder.go:301–350  ·  view source on GitHub ↗

buildBackendRequests returns a slice of requests that cover all blocks in the store that are covered by start/end.

(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, firstShardIdx int, blockIter func(shardIterFn, jobIterFn), reqCh chan<- pipeline.Request, errFn func(error))

Source from the content-addressed store, hash-verified

299// buildBackendRequests returns a slice of requests that cover all blocks in the store
300// that are covered by start/end.
301func buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq *tempopb.SearchRequest, firstShardIdx int, blockIter func(shardIterFn, jobIterFn), reqCh chan<- pipeline.Request, errFn func(error)) {
302 defer close(reqCh)
303
304 queryHash := hashForSearchRequest(searchReq)
305 colsToJSON := api.NewDedicatedColumnsToJSON()
306
307 blockIter(nil, func(m *backend.BlockMeta, shard, startPage, pages int) {
308 blockID := m.BlockID.String()
309
310 dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
311 if err != nil {
312 errFn(fmt.Errorf("failed to convert dedicated columns. block: %s tempopb: %w", blockID, err))
313 return
314 }
315
316 pipelineR, err := cloneRequestforQueriers(parent, tenantID, func(r *http.Request) (*http.Request, error) {
317 r, err = api.BuildSearchBlockRequest(r, &tempopb.SearchBlockRequest{
318 SearchReq: searchReq,
319 BlockID: blockID,
320 StartPage: uint32(startPage),
321 PagesToSearch: uint32(pages),
322 IndexPageSize: m.IndexPageSize,
323 TotalRecords: m.TotalRecords,
324 Version: m.Version,
325 Size_: m.Size_,
326 FooterSize: m.FooterSize,
327 // DedicatedColumns: dc, for perf reason we pass dedicated columns json in directly to not have to realloc object -> proto -> json
328 }, dedColsJSON)
329
330 return r, err
331 })
332 if err != nil {
333 errFn(fmt.Errorf("failed to build search block request. block: %s tempopb: %w", blockID, err))
334 return
335 }
336
337 startTime := time.Unix(int64(searchReq.Start), 0)
338 endTime := time.Unix(int64(searchReq.End), 0)
339 key := searchJobCacheKey(tenantID, queryHash, startTime, endTime, m, startPage, pages)
340 pipelineR.SetCacheKey(key)
341 pipelineR.SetResponseData(firstShardIdx + shard)
342
343 select {
344 case reqCh <- pipelineR:
345 case <-ctx.Done():
346 // ignore the error if there is one. it will be handled elsewhere
347 return
348 }
349 })
350}
351
352// hashForSearchRequest returns a uint64 hash of the query. if the query is invalid it returns a 0 hash.
353// before hashing the query is forced into a canonical form so equivalent queries will hash to the same value.

Callers 3

TestBuildBackendRequestsFunction · 0.85
backendRequestsMethod · 0.85

Calls 10

BuildSearchBlockRequestFunction · 0.92
hashForSearchRequestFunction · 0.85
cloneRequestforQueriersFunction · 0.85
searchJobCacheKeyFunction · 0.85
SetCacheKeyMethod · 0.65
SetResponseDataMethod · 0.65
DoneMethod · 0.65
StringMethod · 0.45

Tested by 2

TestBuildBackendRequestsFunction · 0.68