| 64 | } |
| 65 | |
| 66 | func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { |
| 67 | switch initialResourceVersion { |
| 68 | case "", "0": |
| 69 | // TODO: revisit this if we ever get WATCH v2 where it means start "now" |
| 70 | // without doing the synthetic list of objects at the beginning (see #74022) |
| 71 | return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) |
| 72 | default: |
| 73 | break |
| 74 | } |
| 75 | |
| 76 | rw := &RetryWatcher{ |
| 77 | lastResourceVersion: initialResourceVersion, |
| 78 | watcherClient: watcherClient, |
| 79 | stopChan: make(chan struct{}), |
| 80 | doneChan: make(chan struct{}), |
| 81 | resultChan: make(chan watch.Event, 0), |
| 82 | minRestartDelay: minRestartDelay, |
| 83 | } |
| 84 | |
| 85 | go rw.receive() |
| 86 | return rw, nil |
| 87 | } |
| 88 | |
| 89 | func (rw *RetryWatcher) send(event watch.Event) bool { |
| 90 | // Writing to an unbuffered channel is blocking operation |