watchHandler watches w and keeps *resourceVersion up to date.
(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{})
| 317 | |
| 318 | // watchHandler watches w and keeps *resourceVersion up to date. |
| 319 | func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { |
| 320 | start := r.clock.Now() |
| 321 | eventCount := 0 |
| 322 | |
| 323 | // Stopping the watcher should be idempotent and if we return from this function there's no way |
| 324 | // we're coming back in with the same watch interface. |
| 325 | defer w.Stop() |
| 326 | |
| 327 | loop: |
| 328 | for { |
| 329 | select { |
| 330 | case <-stopCh: |
| 331 | return errorStopRequested |
| 332 | case err := <-errc: |
| 333 | return err |
| 334 | case event, ok := <-w.ResultChan(): |
| 335 | if !ok { |
| 336 | break loop |
| 337 | } |
| 338 | if event.Type == watch.Error { |
| 339 | return apierrs.FromObject(event.Object) |
| 340 | } |
| 341 | if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a { |
| 342 | utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) |
| 343 | continue |
| 344 | } |
| 345 | meta, err := meta.Accessor(event.Object) |
| 346 | if err != nil { |
| 347 | utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) |
| 348 | continue |
| 349 | } |
| 350 | newResourceVersion := meta.GetResourceVersion() |
| 351 | switch event.Type { |
| 352 | case watch.Added: |
| 353 | err := r.store.Add(event.Object) |
| 354 | if err != nil { |
| 355 | utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) |
| 356 | } |
| 357 | case watch.Modified: |
| 358 | err := r.store.Update(event.Object) |
| 359 | if err != nil { |
| 360 | utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) |
| 361 | } |
| 362 | case watch.Deleted: |
| 363 | // TODO: Will any consumers need access to the "last known |
| 364 | // state", which is passed in event.Object? If so, may need |
| 365 | // to change this. |
| 366 | err := r.store.Delete(event.Object) |
| 367 | if err != nil { |
| 368 | utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) |
| 369 | } |
| 370 | case watch.Bookmark: |
| 371 | // A `Bookmark` means watch has synced here, just update the resourceVersion |
| 372 | default: |
| 373 | utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) |
| 374 | } |
| 375 | *resourceVersion = newResourceVersion |
| 376 | r.setLastSyncResourceVersion(newResourceVersion) |