receive reads the result from a watcher, restarting it if necessary.
()
| 238 | |
| 239 | // receive reads the result from a watcher, restarting it if necessary. |
| 240 | func (rw *RetryWatcher) receive() { |
| 241 | defer close(rw.doneChan) |
| 242 | defer close(rw.resultChan) |
| 243 | |
| 244 | klog.V(4).Info("Starting RetryWatcher.") |
| 245 | defer klog.V(4).Info("Stopping RetryWatcher.") |
| 246 | |
| 247 | ctx, cancel := context.WithCancel(context.Background()) |
| 248 | defer cancel() |
| 249 | go func() { |
| 250 | select { |
| 251 | case <-rw.stopChan: |
| 252 | cancel() |
| 253 | return |
| 254 | case <-ctx.Done(): |
| 255 | return |
| 256 | } |
| 257 | }() |
| 258 | |
| 259 | // We use non sliding until so we don't introduce delays on happy path when WATCH call |
| 260 | // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. |
| 261 | wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { |
| 262 | done, retryAfter := rw.doReceive() |
| 263 | if done { |
| 264 | cancel() |
| 265 | return |
| 266 | } |
| 267 | |
| 268 | time.Sleep(retryAfter) |
| 269 | |
| 270 | klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) |
| 271 | }, rw.minRestartDelay) |
| 272 | } |
| 273 | |
| 274 | // ResultChan implements Interface. |
| 275 | func (rw *RetryWatcher) ResultChan() <-chan watch.Event { |
no test coverage detected