newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler
(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], apiPrefix string, o overrides.Interface, logger log.Logger, dataAccessController DataAccessController)
| 27 | |
| 28 | // newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler |
| 29 | func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], apiPrefix string, o overrides.Interface, logger log.Logger, dataAccessController DataAccessController) streamingSearchHandler { |
| 30 | postSLOHook := searchSLOPostHook(cfg.Search.SLO) |
| 31 | downstreamPath := path.Join(apiPrefix, api.PathSearch) |
| 32 | |
| 33 | return func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error { |
| 34 | ctx := srv.Context() |
| 35 | |
| 36 | if dataAccessController != nil { |
| 37 | err := dataAccessController.HandleGRPCSearchReq(ctx, req) |
| 38 | if err != nil { |
| 39 | level.Error(logger).Log("msg", "search streaming: access control handling failed", "err", err) |
| 40 | return err |
| 41 | } |
| 42 | } |
| 43 | |
| 44 | headers := headersFromGrpcContext(ctx) |
| 45 | |
| 46 | httpReq, err := api.BuildSearchRequest(&http.Request{ |
| 47 | URL: &url.URL{Path: downstreamPath}, |
| 48 | Header: headers, |
| 49 | Body: io.NopCloser(bytes.NewReader([]byte{})), |
| 50 | }, req) |
| 51 | if err != nil { |
| 52 | level.Error(logger).Log("msg", "search streaming: build search request failed", "err", err) |
| 53 | return status.Errorf(codes.InvalidArgument, "build search request failed: %s", err.Error()) |
| 54 | } |
| 55 | |
| 56 | httpReq = httpReq.WithContext(ctx) |
| 57 | tenant, _ := user.ExtractOrgID(ctx) |
| 58 | start := time.Now() |
| 59 | |
| 60 | comb, err := newCombiner(req, cfg.Search.Sharder, api.MarshallingFormatProtobuf, o.LeftPadTraceIDs(tenant)) |
| 61 | if err != nil { |
| 62 | level.Error(logger).Log("msg", "search streaming: could not create combiner", "err", err) |
| 63 | return status.Error(codes.InvalidArgument, err.Error()) |
| 64 | |
| 65 | } |
| 66 | |
| 67 | var finalResponse *tempopb.SearchResponse |
| 68 | collector := pipeline.NewGRPCCollector[*tempopb.SearchResponse](next, cfg.ResponseConsumers, cfg.MaxGRPCStreamingPacketSize, comb, func(sr *tempopb.SearchResponse) error { |
| 69 | finalResponse = sr // sadly we can't srv.Send directly into the collector. we need bytesProcessed for the SLO calculations |
| 70 | return srv.Send(sr) |
| 71 | }) |
| 72 | |
| 73 | logRequest(logger, tenant, req) |
| 74 | err = collector.RoundTrip(httpReq) |
| 75 | |
| 76 | duration := time.Since(start) |
| 77 | bytesProcessed := uint64(0) |
| 78 | if finalResponse != nil && finalResponse.Metrics != nil { |
| 79 | bytesProcessed = finalResponse.Metrics.InspectedBytes |
| 80 | } |
| 81 | postSLOHook(nil, tenant, bytesProcessed, duration, err) |
| 82 | logResult(ctx, logger, tenant, duration.Seconds(), req, finalResponse, nil, err) |
| 83 | return err |
| 84 | } |
| 85 | } |
| 86 |
no test coverage detected