MCPcopy
hub / github.com/grafana/tempo / RoundTrip

Method RoundTrip

modules/frontend/search_sharder.go:76–135  ·  view source on GitHub ↗

RoundTrip implements http.RoundTripper execute up to concurrentRequests simultaneously where each request scans ~targetMBsPerRequest until limit results are found

(pipelineRequest pipeline.Request)

Source from the content-addressed store, hash-verified

74// execute up to concurrentRequests simultaneously where each request scans ~targetMBsPerRequest
75// until limit results are found
76func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) {
77 r := pipelineRequest.HTTPRequest()
78
79 // Use configured default (defaults to 3 if not set in config)
80 // If default_spans_per_span_set=0 is explicitly configured, it means unlimited (return all matching spans)
81 searchReq, err := api.ParseSearchRequestWithDefault(r, s.cfg.DefaultSpansPerSpanSet)
82 if err != nil {
83 return pipeline.NewBadRequest(err), nil
84 }
85
86 searchReq.SkipASTTransformations = mergeSkipASTTransformations(s.skipASTTransformations, searchReq.SkipASTTransformations)
87
88 // adjust limit based on config
89 searchReq.Limit, err = adjustLimit(searchReq.Limit, s.cfg.DefaultLimit, s.cfg.MaxLimit)
90 if err != nil {
91 return pipeline.NewBadRequest(err), nil
92 }
93
94 requestCtx := r.Context()
95 tenantID, err := validation.ExtractValidTenantID(requestCtx)
96 if err != nil {
97 return pipeline.NewBadRequest(err), nil
98 }
99 ctx, span := tracer.Start(requestCtx, "frontend.ShardSearch")
100 defer span.End()
101
102 // calculate and enforce max search duration
103 maxDuration := s.maxDuration(tenantID)
104 if maxDuration != 0 && time.Duration(searchReq.End-searchReq.Start)*time.Second > maxDuration {
105 return pipeline.NewBadRequest(fmt.Errorf("range specified by start and end exceeds %s. received start=%d end=%d", maxDuration, searchReq.Start, searchReq.End)), nil
106 }
107
108 // Validate SpansPerSpanSet against MaxSpansPerSpanSet
109 // If MaxSpansPerSpanSet is 0, it means unlimited spans are allowed
110 // If MaxSpansPerSpanSet is non-zero, enforce the limit
111 if s.cfg.MaxSpansPerSpanSet != 0 && searchReq.SpansPerSpanSet > s.cfg.MaxSpansPerSpanSet {
112 return pipeline.NewBadRequest(fmt.Errorf("spans per span set exceeds %d. received %d", s.cfg.MaxSpansPerSpanSet, searchReq.SpansPerSpanSet)), nil
113 }
114
115 // buffer of shards+1 allows us to insert ingestReq and metrics
116 reqCh := make(chan pipeline.Request, s.cfg.IngesterShards+1)
117
118 // build request to search ingesters based on query_backend_after config and time range
119 // pass subCtx in requests so we can cancel and exit early
120 jobMetrics, err := s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh)
121 if err != nil {
122 return nil, err
123 }
124
125 // pass subCtx in requests so we can cancel and exit early
126 s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, jobMetrics, reqCh, func(err error) {
127 // todo: actually find a way to return this error to the user
128 s.logger.Log("msg", "search: failed to build backend requests", "err", err)
129 })
130
131 s.jobsPerQuery.WithLabelValues(searchOp).Observe(float64(jobMetrics.TotalJobs))
132
133 // execute requests

Callers

nothing calls this directly

Calls 15

maxDurationMethod · 0.95
ingesterRequestsMethod · 0.95
backendRequestsMethod · 0.95
NewBadRequestFunction · 0.92
ExtractValidTenantIDFunction · 0.92
NewAsyncSharderChanFunction · 0.92
NewAsyncResponseFunction · 0.92
adjustLimitFunction · 0.85
DurationMethod · 0.80
HTTPRequestMethod · 0.65

Tested by

no test coverage detected