MCPcopy
hub / github.com/grafana/tempo / process

Method process

modules/querier/worker/frontend_processor.go:95–147  ·  view source on GitHub ↗

process loops processing requests on an established stream.

(c frontendv1pb.Frontend_ProcessClient)

Source from the content-addressed store, hash-verified

93
94// process loops processing requests on an established stream.
95func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error {
96 // Build a child context so we can cancel a query when the stream is closed.
97 ctx, cancel := context.WithCancel(c.Context())
98 defer cancel()
99
100 for {
101 request, err := c.Recv()
102 if err != nil {
103 return err
104 }
105
106 switch request.Type {
107 case frontendv1pb.Type_HTTP_REQUEST:
108 // Handle the request on a "background" goroutine, so we go back to
109 // blocking on c.Recv(). This allows us to detect the stream closing
110 // and cancel the query. We don't actually handle queries in parallel
111 // here, as we're running in lock step with the server - each Recv is
112 // paired with a Send.
113 go func() {
114 resp := fp.runRequest(ctx, request.HttpRequest)
115 err := fp.handleSendError(c.Send(&frontendv1pb.ClientToFrontend{
116 HttpResponse: resp,
117 }))
118 if err != nil {
119 level.Error(fp.log).Log("msg", "error running requests", "err", err)
120 }
121 }()
122
123 case frontendv1pb.Type_GET_ID:
124 err := fp.handleSendError(c.Send(&frontendv1pb.ClientToFrontend{
125 ClientID: fp.querierID,
126 Features: int32(frontendv1pb.Feature_REQUEST_BATCHING),
127 }))
128 if err != nil {
129 return err
130 }
131
132 case frontendv1pb.Type_HTTP_REQUEST_BATCH:
133 go func() {
134 resp := fp.runRequests(ctx, request.HttpRequestBatch)
135 err := fp.handleSendError(c.Send(&frontendv1pb.ClientToFrontend{
136 HttpResponseBatch: resp,
137 }))
138 if err != nil {
139 level.Error(fp.log).Log("msg", "error running batched requests", "err", err)
140 }
141 }()
142
143 default:
144 return fmt.Errorf("unknown request type: %v", request.Type)
145 }
146 }
147}
148
149func (fp *frontendProcessor) runRequests(ctx context.Context, requests []*httpgrpc.HTTPRequest) []*httpgrpc.HTTPResponse {
150 wg := sync.WaitGroup{}

Callers 1

Calls 8

runRequestMethod · 0.95
handleSendErrorMethod · 0.95
runRequestsMethod · 0.95
ContextMethod · 0.65
RecvMethod · 0.65
SendMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected