MCPcopy
hub / github.com/kubernetes/client-go / watchHandler

Method watchHandler

tools/cache/reflector.go:319–387  ·  view source on GitHub ↗

watchHandler watches w and keeps *resourceVersion up to date.

(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{})

Source from the content-addressed store, hash-verified

317
318// watchHandler watches w and keeps *resourceVersion up to date.
319func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
320 start := r.clock.Now()
321 eventCount := 0
322
323 // Stopping the watcher should be idempotent and if we return from this function there's no way
324 // we're coming back in with the same watch interface.
325 defer w.Stop()
326
327loop:
328 for {
329 select {
330 case <-stopCh:
331 return errorStopRequested
332 case err := <-errc:
333 return err
334 case event, ok := <-w.ResultChan():
335 if !ok {
336 break loop
337 }
338 if event.Type == watch.Error {
339 return apierrs.FromObject(event.Object)
340 }
341 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
342 utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
343 continue
344 }
345 meta, err := meta.Accessor(event.Object)
346 if err != nil {
347 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
348 continue
349 }
350 newResourceVersion := meta.GetResourceVersion()
351 switch event.Type {
352 case watch.Added:
353 err := r.store.Add(event.Object)
354 if err != nil {
355 utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
356 }
357 case watch.Modified:
358 err := r.store.Update(event.Object)
359 if err != nil {
360 utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
361 }
362 case watch.Deleted:
363 // TODO: Will any consumers need access to the "last known
364 // state", which is passed in event.Object? If so, may need
365 // to change this.
366 err := r.store.Delete(event.Object)
367 if err != nil {
368 utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
369 }
370 case watch.Bookmark:
371 // A `Bookmark` means watch has synced here, just update the resourceVersion
372 default:
373 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
374 }
375 *resourceVersion = newResourceVersion
376 r.setLastSyncResourceVersion(newResourceVersion)

Callers 4

TestReflectorStopWatchFunction · 0.95
ListAndWatchMethod · 0.95

Calls 9

NowMethod · 0.65
StopMethod · 0.65
ErrorfMethod · 0.65
GetResourceVersionMethod · 0.65
AddMethod · 0.65
UpdateMethod · 0.65
DeleteMethod · 0.65
ResultChanMethod · 0.45

Tested by 3

TestReflectorStopWatchFunction · 0.76