MCPcopy
hub / github.com/grafana/tempo / NewQueryRange

Function NewQueryRange

modules/frontend/combiner/metrics_query_range.go:39–156  ·  view source on GitHub ↗

NewQueryRange returns a query range combiner.

(req *tempopb.QueryRangeRequest, maxSeriesLimit int)

Source from the content-addressed store, hash-verified

37
38// NewQueryRange returns a query range combiner.
39func NewQueryRange(req *tempopb.QueryRangeRequest, maxSeriesLimit int) (Combiner, error) {
40 // if a limit is being enforced, honor the request if it is less than the limit
41 // else set it to max limit
42 maxSeries := int(req.MaxSeries)
43 if maxSeriesLimit > 0 && int(req.MaxSeries) > maxSeriesLimit || req.MaxSeries == 0 {
44 maxSeries = maxSeriesLimit
45 }
46 combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, maxSeries)
47 if err != nil {
48 return nil, err
49 }
50
51 completionTracker := &shardtracker.CompletionTracker{}
52 maxSeriesReachedErrorMsg := fmt.Sprintf("Response exceeds maximum series limit of %d, a partial response is returned. Warning: the accuracy of each individual value is not guaranteed.", maxSeries)
53
54 metricsCombiner := NewQueryRangeMetricsCombiner()
55 lastCompletedThrough := shardtracker.TimestampNever
56 c := &genericCombiner[*tempopb.QueryRangeResponse]{
57 httpStatusCode: 200,
58 new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} },
59 current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}},
60 combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error {
61 combiner.Combine(partial)
62 metricsCombiner.Combine(partial.Metrics, resp)
63
64 // Track shard completion
65 if shardIdx, ok := resp.RequestData().(int); ok {
66 completionTracker.AddShardIdx(shardIdx)
67 }
68
69 return nil
70 },
71 metadata: func(resp PipelineResponse, _ *tempopb.QueryRangeResponse) error {
72 if qr, ok := resp.(*QueryRangeJobResponse); ok && qr != nil {
73 qrMetrics := &tempopb.SearchMetrics{
74 TotalBlocks: uint32(qr.TotalBlocks), //nolint:gosec
75 TotalJobs: uint32(qr.TotalJobs), //nolint:gosec
76 TotalBlockBytes: qr.TotalBytes,
77 }
78 metricsCombiner.Combine(qrMetrics, resp)
79
80 completionTracker.AddShards(qr.Shards)
81 }
82 return nil
83 },
84 finalize: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
85 resp := combiner.Response()
86 if resp == nil {
87 resp = &tempopb.QueryRangeResponse{}
88 }
89
90 sortResponse(resp)
91 if combiner.MaxSeriesReached() {
92 // Truncating the final response because even if we bail as soon as len(resp.Series) >= maxSeries
93 // it's possible that the last response pushed us over the max series limit.
94 if len(resp.Series) > maxSeries {
95 resp.Series = resp.Series[:maxSeries]
96 }

Calls 14

CombineMethod · 0.95
AddShardIdxMethod · 0.95
AddShardsMethod · 0.95
QueryRangeCombinerForFunction · 0.92
sortResponseFunction · 0.85
attachExemplarsFunction · 0.85
initHTTPCombinerFunction · 0.85
ResponseMethod · 0.80
MaxSeriesReachedMethod · 0.80