MCPcopy
hub / github.com/kubernetes/client-go / receive

Method receive

tools/watch/retrywatcher.go:240–272  ·  view source on GitHub ↗

receive reads the result from a watcher, restarting it if necessary.

()

Source from the content-addressed store, hash-verified

238
239// receive reads the result from a watcher, restarting it if necessary.
240func (rw *RetryWatcher) receive() {
241 defer close(rw.doneChan)
242 defer close(rw.resultChan)
243
244 klog.V(4).Info("Starting RetryWatcher.")
245 defer klog.V(4).Info("Stopping RetryWatcher.")
246
247 ctx, cancel := context.WithCancel(context.Background())
248 defer cancel()
249 go func() {
250 select {
251 case <-rw.stopChan:
252 cancel()
253 return
254 case <-ctx.Done():
255 return
256 }
257 }()
258
259 // We use non sliding until so we don't introduce delays on happy path when WATCH call
260 // timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
261 wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
262 done, retryAfter := rw.doReceive()
263 if done {
264 cancel()
265 return
266 }
267
268 time.Sleep(retryAfter)
269
270 klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
271 }, rw.minRestartDelay)
272}
273
274// ResultChan implements Interface.
275func (rw *RetryWatcher) ResultChan() <-chan watch.Event {

Callers 1

newRetryWatcherFunction · 0.95

Calls 3

doReceiveMethod · 0.95
DoneMethod · 0.65
SleepMethod · 0.65

Tested by

no test coverage detected