(reqBatch *requestBatch, errs chan error, resps chan *frontendv1pb.ClientToFrontend)
| 305 | } |
| 306 | |
| 307 | func reportResponseUpstream(reqBatch *requestBatch, errs chan error, resps chan *frontendv1pb.ClientToFrontend) error { |
| 308 | stopCh := make(chan struct{}) |
| 309 | defer close(stopCh) |
| 310 | |
| 311 | select { |
| 312 | // If the upstream request is cancelled, we need to cancel the |
| 313 | // downstream req. Only way we can do that is to close the stream. |
| 314 | // The worker client is expecting this semantics. |
| 315 | case <-reqBatch.doneChan(stopCh): |
| 316 | return reqBatch.contextError() |
| 317 | |
| 318 | // Is there was an error handling this request due to network IO, |
| 319 | // then error out this upstream request _and_ stream. |
| 320 | // The assumption appears to be that the querier will reestablish in the event of this kind |
| 321 | // of error. |
| 322 | case err := <-errs: |
| 323 | reqBatch.reportErrorToPipeline(err) |
| 324 | return err |
| 325 | |
| 326 | // Happy path :D |
| 327 | case resp := <-resps: |
| 328 | // todo: like above support for batches and single requests |
| 329 | // can be removed in a few versions once all queriers support batching |
| 330 | var err error |
| 331 | if len(resp.HttpResponseBatch) == 0 { |
| 332 | err = reqBatch.reportResultsToPipeline([]*httpgrpc.HTTPResponse{resp.HttpResponse}) |
| 333 | } else { |
| 334 | err = reqBatch.reportResultsToPipeline(resp.HttpResponseBatch) |
| 335 | } |
| 336 | if err != nil { |
| 337 | return fmt.Errorf("unexpected error reporting results upstream: %w", err) |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | return nil |
| 342 | } |
| 343 | |
| 344 | func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) { |
| 345 | level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID()) |
no test coverage detected