MCPcopy
hub / github.com/grafana/tempo / reportResponseUpstream

Function reportResponseUpstream

modules/frontend/v1/frontend.go:307–342  ·  view source on GitHub ↗
(reqBatch *requestBatch, errs chan error, resps chan *frontendv1pb.ClientToFrontend)

Source from the content-addressed store, hash-verified

305}
306
307func reportResponseUpstream(reqBatch *requestBatch, errs chan error, resps chan *frontendv1pb.ClientToFrontend) error {
308 stopCh := make(chan struct{})
309 defer close(stopCh)
310
311 select {
312 // If the upstream request is cancelled, we need to cancel the
313 // downstream req. Only way we can do that is to close the stream.
314 // The worker client is expecting this semantics.
315 case <-reqBatch.doneChan(stopCh):
316 return reqBatch.contextError()
317
318 // Is there was an error handling this request due to network IO,
319 // then error out this upstream request _and_ stream.
320 // The assumption appears to be that the querier will reestablish in the event of this kind
321 // of error.
322 case err := <-errs:
323 reqBatch.reportErrorToPipeline(err)
324 return err
325
326 // Happy path :D
327 case resp := <-resps:
328 // todo: like above support for batches and single requests
329 // can be removed in a few versions once all queriers support batching
330 var err error
331 if len(resp.HttpResponseBatch) == 0 {
332 err = reqBatch.reportResultsToPipeline([]*httpgrpc.HTTPResponse{resp.HttpResponse})
333 } else {
334 err = reqBatch.reportResultsToPipeline(resp.HttpResponseBatch)
335 }
336 if err != nil {
337 return fmt.Errorf("unexpected error reporting results upstream: %w", err)
338 }
339 }
340
341 return nil
342}
343
344func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
345 level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID())

Callers 1

ProcessMethod · 0.85

Calls 4

doneChanMethod · 0.80
contextErrorMethod · 0.80
reportErrorToPipelineMethod · 0.80

Tested by

no test coverage detected