NewQueryRange returns a query range combiner.
(req *tempopb.QueryRangeRequest, maxSeriesLimit int)
| 37 | |
| 38 | // NewQueryRange returns a query range combiner. |
| 39 | func NewQueryRange(req *tempopb.QueryRangeRequest, maxSeriesLimit int) (Combiner, error) { |
| 40 | // if a limit is being enforced, honor the request if it is less than the limit |
| 41 | // else set it to max limit |
| 42 | maxSeries := int(req.MaxSeries) |
| 43 | if maxSeriesLimit > 0 && int(req.MaxSeries) > maxSeriesLimit || req.MaxSeries == 0 { |
| 44 | maxSeries = maxSeriesLimit |
| 45 | } |
| 46 | combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, maxSeries) |
| 47 | if err != nil { |
| 48 | return nil, err |
| 49 | } |
| 50 | |
| 51 | completionTracker := &shardtracker.CompletionTracker{} |
| 52 | maxSeriesReachedErrorMsg := fmt.Sprintf("Response exceeds maximum series limit of %d, a partial response is returned. Warning: the accuracy of each individual value is not guaranteed.", maxSeries) |
| 53 | |
| 54 | metricsCombiner := NewQueryRangeMetricsCombiner() |
| 55 | lastCompletedThrough := shardtracker.TimestampNever |
| 56 | c := &genericCombiner[*tempopb.QueryRangeResponse]{ |
| 57 | httpStatusCode: 200, |
| 58 | new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} }, |
| 59 | current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}}, |
| 60 | combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error { |
| 61 | combiner.Combine(partial) |
| 62 | metricsCombiner.Combine(partial.Metrics, resp) |
| 63 | |
| 64 | // Track shard completion |
| 65 | if shardIdx, ok := resp.RequestData().(int); ok { |
| 66 | completionTracker.AddShardIdx(shardIdx) |
| 67 | } |
| 68 | |
| 69 | return nil |
| 70 | }, |
| 71 | metadata: func(resp PipelineResponse, _ *tempopb.QueryRangeResponse) error { |
| 72 | if qr, ok := resp.(*QueryRangeJobResponse); ok && qr != nil { |
| 73 | qrMetrics := &tempopb.SearchMetrics{ |
| 74 | TotalBlocks: uint32(qr.TotalBlocks), //nolint:gosec |
| 75 | TotalJobs: uint32(qr.TotalJobs), //nolint:gosec |
| 76 | TotalBlockBytes: qr.TotalBytes, |
| 77 | } |
| 78 | metricsCombiner.Combine(qrMetrics, resp) |
| 79 | |
| 80 | completionTracker.AddShards(qr.Shards) |
| 81 | } |
| 82 | return nil |
| 83 | }, |
| 84 | finalize: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) { |
| 85 | resp := combiner.Response() |
| 86 | if resp == nil { |
| 87 | resp = &tempopb.QueryRangeResponse{} |
| 88 | } |
| 89 | |
| 90 | sortResponse(resp) |
| 91 | if combiner.MaxSeriesReached() { |
| 92 | // Truncating the final response because even if we bail as soon as len(resp.Series) >= maxSeries |
| 93 | // it's possible that the last response pushed us over the max series limit. |
| 94 | if len(resp.Series) > maxSeries { |
| 95 | resp.Series = resp.Series[:maxSeries] |
| 96 | } |