MCPcopy
hub / github.com/grafana/tempo / Process

Method Process

modules/frontend/v1/frontend.go:216–305  ·  view source on GitHub ↗

Process allows backends to pull requests from the frontend.

(server frontendv1pb.Frontend_ProcessServer)

Source from the content-addressed store, hash-verified

214
215// Process allows backends to pull requests from the frontend.
216func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
217 _, querierFeatures, err := getQuerierInfo(server)
218 if err != nil {
219 return err
220 }
221
222 f.connectedQuerierWorkers.Add(1)
223 defer f.connectedQuerierWorkers.Add(-1)
224
225 lastUserIndex := queue.FirstUser()
226
227 reqBatch := &requestBatch{}
228 batchSize := 1
229 if querierSupportsBatching(querierFeatures) {
230 batchSize = f.cfg.MaxBatchSize
231 }
232 for {
233 reqSlice := make([]queue.Request, batchSize)
234 reqSlice, idx, err := f.requestQueue.GetNextRequestForQuerier(server.Context(), lastUserIndex, reqSlice)
235 if err != nil {
236 return err
237 }
238 lastUserIndex = idx
239
240 reqBatch.clear()
241 for _, reqWrapper := range reqSlice {
242 req := reqWrapper.(*request)
243
244 f.queueDuration.Observe(time.Since(req.enqueueTime).Seconds())
245 req.queueSpan.End()
246
247 // only add if not expired
248 if req.OriginalContext().Err() != nil {
249 continue
250 }
251
252 err = reqBatch.add(req)
253 if err != nil {
254 return fmt.Errorf("unexpected error adding request to batch: %w", err)
255 }
256 }
257
258 // if all requests are expired then continue requesting jobs for this user. this nicely
259 // drains a large expired query for a tenant and allows them to execute a real query
260 if reqBatch.len() == 0 {
261 lastUserIndex = lastUserIndex.ReuseLastUser()
262 continue
263 }
264
265 f.actualBatchSize.Observe(float64(reqBatch.len()))
266
267 // Handle the stream sending & receiving on a goroutine so we can
268 // monitoring the contexts in a select and cancel things appropriately.
269 resps := make(chan *frontendv1pb.ClientToFrontend, 1)
270 errs := make(chan error, 1)
271 go func() {
272 // todo: we are still sending the old Type_HTTP_REQUEST for backwards compat
273 // with queriers that don't support the new Type_HTTP_REQUEST_BATCH. this feature

Callers

nothing calls this directly

Calls 15

clearMethod · 0.95
addMethod · 0.95
lenMethod · 0.95
httpGrpcRequestsMethod · 0.95
FirstUserFunction · 0.92
getQuerierInfoFunction · 0.85
querierSupportsBatchingFunction · 0.85
reportResponseUpstreamFunction · 0.85
OriginalContextMethod · 0.80
ReuseLastUserMethod · 0.80
AddMethod · 0.65

Tested by

no test coverage detected