doReceive returns true when it is done, false otherwise. If it is not done the second return value holds the time to wait before calling it again.
()
| 100 | // doReceive returns true when it is done, false otherwise. |
| 101 | // If it is not done the second return value holds the time to wait before calling it again. |
| 102 | func (rw *RetryWatcher) doReceive() (bool, time.Duration) { |
| 103 | watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ |
| 104 | ResourceVersion: rw.lastResourceVersion, |
| 105 | }) |
| 106 | // We are very unlikely to hit EOF here since we are just establishing the call, |
| 107 | // but it may happen that the apiserver is just shutting down (e.g. being restarted) |
| 108 | // This is consistent with how it is handled for informers |
| 109 | switch err { |
| 110 | case nil: |
| 111 | break |
| 112 | |
| 113 | case io.EOF: |
| 114 | // watch closed normally |
| 115 | return false, 0 |
| 116 | |
| 117 | case io.ErrUnexpectedEOF: |
| 118 | klog.V(1).Infof("Watch closed with unexpected EOF: %v", err) |
| 119 | return false, 0 |
| 120 | |
| 121 | default: |
| 122 | msg := "Watch failed: %v" |
| 123 | if net.IsProbableEOF(err) { |
| 124 | klog.V(5).Infof(msg, err) |
| 125 | // Retry |
| 126 | return false, 0 |
| 127 | } |
| 128 | |
| 129 | klog.Errorf(msg, err) |
| 130 | // Retry |
| 131 | return false, 0 |
| 132 | } |
| 133 | |
| 134 | if watcher == nil { |
| 135 | klog.Error("Watch returned nil watcher") |
| 136 | // Retry |
| 137 | return false, 0 |
| 138 | } |
| 139 | |
| 140 | ch := watcher.ResultChan() |
| 141 | defer watcher.Stop() |
| 142 | |
| 143 | for { |
| 144 | select { |
| 145 | case <-rw.stopChan: |
| 146 | klog.V(4).Info("Stopping RetryWatcher.") |
| 147 | return true, 0 |
| 148 | case event, ok := <-ch: |
| 149 | if !ok { |
| 150 | klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion) |
| 151 | return false, 0 |
| 152 | } |
| 153 | |
| 154 | // We need to inspect the event and get ResourceVersion out of it |
| 155 | switch event.Type { |
| 156 | case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: |
| 157 | metaObject, ok := event.Object.(resourceVersionGetter) |
| 158 | if !ok { |
| 159 | _ = rw.send(watch.Event{ |
no test coverage detected