process loops processing requests on an established stream.
(c frontendv1pb.Frontend_ProcessClient)
| 93 | |
| 94 | // process loops processing requests on an established stream. |
| 95 | func (fp *frontendProcessor) process(c frontendv1pb.Frontend_ProcessClient) error { |
| 96 | // Build a child context so we can cancel a query when the stream is closed. |
| 97 | ctx, cancel := context.WithCancel(c.Context()) |
| 98 | defer cancel() |
| 99 | |
| 100 | for { |
| 101 | request, err := c.Recv() |
| 102 | if err != nil { |
| 103 | return err |
| 104 | } |
| 105 | |
| 106 | switch request.Type { |
| 107 | case frontendv1pb.Type_HTTP_REQUEST: |
| 108 | // Handle the request on a "background" goroutine, so we go back to |
| 109 | // blocking on c.Recv(). This allows us to detect the stream closing |
| 110 | // and cancel the query. We don't actually handle queries in parallel |
| 111 | // here, as we're running in lock step with the server - each Recv is |
| 112 | // paired with a Send. |
| 113 | go func() { |
| 114 | resp := fp.runRequest(ctx, request.HttpRequest) |
| 115 | err := fp.handleSendError(c.Send(&frontendv1pb.ClientToFrontend{ |
| 116 | HttpResponse: resp, |
| 117 | })) |
| 118 | if err != nil { |
| 119 | level.Error(fp.log).Log("msg", "error running requests", "err", err) |
| 120 | } |
| 121 | }() |
| 122 | |
| 123 | case frontendv1pb.Type_GET_ID: |
| 124 | err := fp.handleSendError(c.Send(&frontendv1pb.ClientToFrontend{ |
| 125 | ClientID: fp.querierID, |
| 126 | Features: int32(frontendv1pb.Feature_REQUEST_BATCHING), |
| 127 | })) |
| 128 | if err != nil { |
| 129 | return err |
| 130 | } |
| 131 | |
| 132 | case frontendv1pb.Type_HTTP_REQUEST_BATCH: |
| 133 | go func() { |
| 134 | resp := fp.runRequests(ctx, request.HttpRequestBatch) |
| 135 | err := fp.handleSendError(c.Send(&frontendv1pb.ClientToFrontend{ |
| 136 | HttpResponseBatch: resp, |
| 137 | })) |
| 138 | if err != nil { |
| 139 | level.Error(fp.log).Log("msg", "error running batched requests", "err", err) |
| 140 | } |
| 141 | }() |
| 142 | |
| 143 | default: |
| 144 | return fmt.Errorf("unknown request type: %v", request.Type) |
| 145 | } |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | func (fp *frontendProcessor) runRequests(ctx context.Context, requests []*httpgrpc.HTTPRequest) []*httpgrpc.HTTPResponse { |
| 150 | wg := sync.WaitGroup{} |
no test coverage detected