MCPcopy
hub / github.com/grafana/tempo / newSearchStreamingGRPCHandler

Function newSearchStreamingGRPCHandler

modules/frontend/search_handlers.go:29–85  ·  view source on GitHub ↗

newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler

(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], apiPrefix string, o overrides.Interface, logger log.Logger, dataAccessController DataAccessController)

Source from the content-addressed store, hash-verified

27
28// newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler
29func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], apiPrefix string, o overrides.Interface, logger log.Logger, dataAccessController DataAccessController) streamingSearchHandler {
30 postSLOHook := searchSLOPostHook(cfg.Search.SLO)
31 downstreamPath := path.Join(apiPrefix, api.PathSearch)
32
33 return func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error {
34 ctx := srv.Context()
35
36 if dataAccessController != nil {
37 err := dataAccessController.HandleGRPCSearchReq(ctx, req)
38 if err != nil {
39 level.Error(logger).Log("msg", "search streaming: access control handling failed", "err", err)
40 return err
41 }
42 }
43
44 headers := headersFromGrpcContext(ctx)
45
46 httpReq, err := api.BuildSearchRequest(&http.Request{
47 URL: &url.URL{Path: downstreamPath},
48 Header: headers,
49 Body: io.NopCloser(bytes.NewReader([]byte{})),
50 }, req)
51 if err != nil {
52 level.Error(logger).Log("msg", "search streaming: build search request failed", "err", err)
53 return status.Errorf(codes.InvalidArgument, "build search request failed: %s", err.Error())
54 }
55
56 httpReq = httpReq.WithContext(ctx)
57 tenant, _ := user.ExtractOrgID(ctx)
58 start := time.Now()
59
60 comb, err := newCombiner(req, cfg.Search.Sharder, api.MarshallingFormatProtobuf, o.LeftPadTraceIDs(tenant))
61 if err != nil {
62 level.Error(logger).Log("msg", "search streaming: could not create combiner", "err", err)
63 return status.Error(codes.InvalidArgument, err.Error())
64
65 }
66
67 var finalResponse *tempopb.SearchResponse
68 collector := pipeline.NewGRPCCollector[*tempopb.SearchResponse](next, cfg.ResponseConsumers, cfg.MaxGRPCStreamingPacketSize, comb, func(sr *tempopb.SearchResponse) error {
69 finalResponse = sr // sadly we can't srv.Send directly into the collector. we need bytesProcessed for the SLO calculations
70 return srv.Send(sr)
71 })
72
73 logRequest(logger, tenant, req)
74 err = collector.RoundTrip(httpReq)
75
76 duration := time.Since(start)
77 bytesProcessed := uint64(0)
78 if finalResponse != nil && finalResponse.Metrics != nil {
79 bytesProcessed = finalResponse.Metrics.InspectedBytes
80 }
81 postSLOHook(nil, tenant, bytesProcessed, duration, err)
82 logResult(ctx, logger, tenant, duration.Seconds(), req, finalResponse, nil, err)
83 return err
84 }
85}
86

Callers 1

NewFunction · 0.85

Calls 15

BuildSearchRequestFunction · 0.92
NewGRPCCollectorFunction · 0.92
searchSLOPostHookFunction · 0.85
headersFromGrpcContextFunction · 0.85
newCombinerFunction · 0.85
logRequestFunction · 0.85
logResultFunction · 0.85
HandleGRPCSearchReqMethod · 0.80
JoinMethod · 0.65
ContextMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected