newSearchHTTPHandler returns a handler that returns a single response from the HTTP handler
(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, logger log.Logger, dataAccessController DataAccessController)
| 86 | |
| 87 | // newSearchHTTPHandler returns a handler that returns a single response from the HTTP handler |
| 88 | func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, logger log.Logger, dataAccessController DataAccessController) http.RoundTripper { |
| 89 | postSLOHook := searchSLOPostHook(cfg.Search.SLO) |
| 90 | |
| 91 | return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { |
| 92 | tenant, errResp := extractTenant(req, logger) |
| 93 | if errResp != nil { |
| 94 | return errResp, nil |
| 95 | } |
| 96 | start := time.Now() |
| 97 | |
| 98 | if dataAccessController != nil { |
| 99 | if err := dataAccessController.HandleHTTPSearchReq(req); err != nil { |
| 100 | level.Error(logger).Log("msg", "http search: access control handling failed", "err", err) |
| 101 | return httpInvalidRequest(err), nil |
| 102 | } |
| 103 | } |
| 104 | |
| 105 | // parse request |
| 106 | searchReq, err := api.ParseSearchRequest(req) |
| 107 | if err != nil { |
| 108 | level.Error(logger).Log("msg", "search: parse search request failed", "err", err) |
| 109 | return httpInvalidRequest(err), nil |
| 110 | } |
| 111 | |
| 112 | // check marshalling format |
| 113 | marshallingFormat := api.MarshalingFormatFromAcceptHeader(req.Header) |
| 114 | |
| 115 | comb, err := newCombiner(searchReq, cfg.Search.Sharder, marshallingFormat, o.LeftPadTraceIDs(tenant)) |
| 116 | if err != nil { |
| 117 | level.Error(logger).Log("msg", "search: could not create combiner", "err", err) |
| 118 | return httpInvalidRequest(err), nil |
| 119 | } |
| 120 | |
| 121 | logRequest(logger, tenant, searchReq) |
| 122 | |
| 123 | // build and use roundtripper |
| 124 | rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb) |
| 125 | |
| 126 | resp, err := rt.RoundTrip(req) |
| 127 | |
| 128 | // ask for the typed diff and use that for the SLO hook. it will have up to date metrics |
| 129 | var bytesProcessed uint64 |
| 130 | searchResp, _ := comb.GRPCDiff() |
| 131 | if searchResp != nil && searchResp.Metrics != nil { |
| 132 | bytesProcessed = searchResp.Metrics.InspectedBytes |
| 133 | } |
| 134 | |
| 135 | duration := time.Since(start) |
| 136 | postSLOHook(resp, tenant, bytesProcessed, duration, err) |
| 137 | logResult(req.Context(), logger, tenant, duration.Seconds(), searchReq, searchResp, resp, err) |
| 138 | return resp, err |
| 139 | }) |
| 140 | } |
| 141 | |
| 142 | func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig, marshalingFormat api.MarshallingFormat, padTraceIDs bool) (combiner.GRPCCombiner[*tempopb.SearchResponse], error) { |
| 143 | limit, err := adjustLimit(req.Limit, cfg.DefaultLimit, cfg.MaxLimit) |
no test coverage detected