runner is responsible for managing the lifetime of an LRS streaming call. It creates the stream, sends the initial LoadStatsRequest, receives the first LoadStatsResponse, and then starts a goroutine to periodically send LoadStatsRequests. The runner will restart the stream if it encounters any error
(ctx context.Context)
| 102 | // LoadStatsRequests. The runner will restart the stream if it encounters any |
| 103 | // errors. |
| 104 | func (lrs *streamImpl) runner(ctx context.Context) { |
| 105 | defer close(lrs.doneCh) |
| 106 | |
| 107 | // This feature indicates that the client supports the |
| 108 | // LoadStatsResponse.send_all_clusters field in the LRS response. |
| 109 | node := proto.Clone(lrs.nodeProto).(*v3corepb.Node) |
| 110 | node.ClientFeatures = append(node.ClientFeatures, "envoy.lrs.supports_send_all_clusters") |
| 111 | |
| 112 | runLoadReportStream := func() error { |
| 113 | // streamCtx is created and canceled in case we terminate the stream |
| 114 | // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring |
| 115 | // goroutine. |
| 116 | streamCtx, cancel := context.WithCancel(ctx) |
| 117 | defer cancel() |
| 118 | |
| 119 | stream, err := lrs.transport.NewStream(streamCtx, "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats") |
| 120 | if err != nil { |
| 121 | lrs.logger.Warningf("Failed to create new LRS streaming RPC: %v", err) |
| 122 | return nil |
| 123 | } |
| 124 | if lrs.logger.V(2) { |
| 125 | lrs.logger.Infof("LRS stream created") |
| 126 | } |
| 127 | |
| 128 | if err := lrs.sendFirstLoadStatsRequest(stream, node); err != nil { |
| 129 | lrs.logger.Warningf("Sending first LRS request failed: %v", err) |
| 130 | return nil |
| 131 | } |
| 132 | |
| 133 | clusters, interval, err := lrs.recvFirstLoadStatsResponse(stream) |
| 134 | if err != nil { |
| 135 | lrs.logger.Warningf("Reading from LRS streaming RPC failed: %v", err) |
| 136 | return nil |
| 137 | } |
| 138 | |
| 139 | // We reset backoff state when we successfully receive at least one |
| 140 | // message from the server. |
| 141 | lrs.sendLoads(streamCtx, stream, clusters, interval) |
| 142 | return backoff.ErrResetBackoff |
| 143 | } |
| 144 | backoff.RunF(ctx, runLoadReportStream, lrs.backoff) |
| 145 | } |
| 146 | |
| 147 | // sendLoads is responsible for periodically sending load reports to the LRS |
| 148 | // server at the specified interval for the specified clusters, until the passed |
no test coverage detected