RoundTrip implements http.RoundTripper execute up to concurrentRequests simultaneously where each request scans ~targetMBsPerRequest until limit results are found
(pipelineRequest pipeline.Request)
| 74 | // execute up to concurrentRequests simultaneously where each request scans ~targetMBsPerRequest |
| 75 | // until limit results are found |
| 76 | func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { |
| 77 | r := pipelineRequest.HTTPRequest() |
| 78 | |
| 79 | // Use configured default (defaults to 3 if not set in config) |
| 80 | // If default_spans_per_span_set=0 is explicitly configured, it means unlimited (return all matching spans) |
| 81 | searchReq, err := api.ParseSearchRequestWithDefault(r, s.cfg.DefaultSpansPerSpanSet) |
| 82 | if err != nil { |
| 83 | return pipeline.NewBadRequest(err), nil |
| 84 | } |
| 85 | |
| 86 | searchReq.SkipASTTransformations = mergeSkipASTTransformations(s.skipASTTransformations, searchReq.SkipASTTransformations) |
| 87 | |
| 88 | // adjust limit based on config |
| 89 | searchReq.Limit, err = adjustLimit(searchReq.Limit, s.cfg.DefaultLimit, s.cfg.MaxLimit) |
| 90 | if err != nil { |
| 91 | return pipeline.NewBadRequest(err), nil |
| 92 | } |
| 93 | |
| 94 | requestCtx := r.Context() |
| 95 | tenantID, err := validation.ExtractValidTenantID(requestCtx) |
| 96 | if err != nil { |
| 97 | return pipeline.NewBadRequest(err), nil |
| 98 | } |
| 99 | ctx, span := tracer.Start(requestCtx, "frontend.ShardSearch") |
| 100 | defer span.End() |
| 101 | |
| 102 | // calculate and enforce max search duration |
| 103 | maxDuration := s.maxDuration(tenantID) |
| 104 | if maxDuration != 0 && time.Duration(searchReq.End-searchReq.Start)*time.Second > maxDuration { |
| 105 | return pipeline.NewBadRequest(fmt.Errorf("range specified by start and end exceeds %s. received start=%d end=%d", maxDuration, searchReq.Start, searchReq.End)), nil |
| 106 | } |
| 107 | |
| 108 | // Validate SpansPerSpanSet against MaxSpansPerSpanSet |
| 109 | // If MaxSpansPerSpanSet is 0, it means unlimited spans are allowed |
| 110 | // If MaxSpansPerSpanSet is non-zero, enforce the limit |
| 111 | if s.cfg.MaxSpansPerSpanSet != 0 && searchReq.SpansPerSpanSet > s.cfg.MaxSpansPerSpanSet { |
| 112 | return pipeline.NewBadRequest(fmt.Errorf("spans per span set exceeds %d. received %d", s.cfg.MaxSpansPerSpanSet, searchReq.SpansPerSpanSet)), nil |
| 113 | } |
| 114 | |
| 115 | // buffer of shards+1 allows us to insert ingestReq and metrics |
| 116 | reqCh := make(chan pipeline.Request, s.cfg.IngesterShards+1) |
| 117 | |
| 118 | // build request to search ingesters based on query_backend_after config and time range |
| 119 | // pass subCtx in requests so we can cancel and exit early |
| 120 | jobMetrics, err := s.ingesterRequests(tenantID, pipelineRequest, *searchReq, reqCh) |
| 121 | if err != nil { |
| 122 | return nil, err |
| 123 | } |
| 124 | |
| 125 | // pass subCtx in requests so we can cancel and exit early |
| 126 | s.backendRequests(ctx, tenantID, pipelineRequest, searchReq, jobMetrics, reqCh, func(err error) { |
| 127 | // todo: actually find a way to return this error to the user |
| 128 | s.logger.Log("msg", "search: failed to build backend requests", "err", err) |
| 129 | }) |
| 130 | |
| 131 | s.jobsPerQuery.WithLabelValues(searchOp).Observe(float64(jobMetrics.TotalJobs)) |
| 132 | |
| 133 | // execute requests |
nothing calls this directly
no test coverage detected