MCPcopy
hub / github.com/grpc/grpc-go / runner

Method runner

internal/xds/clients/lrsclient/lrs_stream.go:104–145  ·  view source on GitHub ↗

runner is responsible for managing the lifetime of an LRS streaming call. It creates the stream, sends the initial LoadStatsRequest, receives the first LoadStatsResponse, and then starts a goroutine to periodically send LoadStatsRequests. The runner will restart the stream if it encounters any error

(ctx context.Context)

Source from the content-addressed store, hash-verified

102// LoadStatsRequests. The runner will restart the stream if it encounters any
103// errors.
104func (lrs *streamImpl) runner(ctx context.Context) {
105 defer close(lrs.doneCh)
106
107 // This feature indicates that the client supports the
108 // LoadStatsResponse.send_all_clusters field in the LRS response.
109 node := proto.Clone(lrs.nodeProto).(*v3corepb.Node)
110 node.ClientFeatures = append(node.ClientFeatures, "envoy.lrs.supports_send_all_clusters")
111
112 runLoadReportStream := func() error {
113 // streamCtx is created and canceled in case we terminate the stream
114 // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring
115 // goroutine.
116 streamCtx, cancel := context.WithCancel(ctx)
117 defer cancel()
118
119 stream, err := lrs.transport.NewStream(streamCtx, "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats")
120 if err != nil {
121 lrs.logger.Warningf("Failed to create new LRS streaming RPC: %v", err)
122 return nil
123 }
124 if lrs.logger.V(2) {
125 lrs.logger.Infof("LRS stream created")
126 }
127
128 if err := lrs.sendFirstLoadStatsRequest(stream, node); err != nil {
129 lrs.logger.Warningf("Sending first LRS request failed: %v", err)
130 return nil
131 }
132
133 clusters, interval, err := lrs.recvFirstLoadStatsResponse(stream)
134 if err != nil {
135 lrs.logger.Warningf("Reading from LRS streaming RPC failed: %v", err)
136 return nil
137 }
138
139 // We reset backoff state when we successfully receive at least one
140 // message from the server.
141 lrs.sendLoads(streamCtx, stream, clusters, interval)
142 return backoff.ErrResetBackoff
143 }
144 backoff.RunF(ctx, runLoadReportStream, lrs.backoff)
145}
146
147// sendLoads is responsible for periodically sending load reports to the LRS
148// server at the specified interval for the specified clusters, until the passed

Callers 1

newStreamImplFunction · 0.95

Calls 9

sendLoadsMethod · 0.95
RunFFunction · 0.92
CloneMethod · 0.65
NewStreamMethod · 0.65
WarningfMethod · 0.65
VMethod · 0.65
InfofMethod · 0.65

Tested by

no test coverage detected