MCPcopy
hub / github.com/kubernetes/client-go / WatchWithSpecificDecoders

Method WatchWithSpecificDecoders

rest/request.go:549–604  ·  view source on GitHub ↗

WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder. Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content Returns a watch.Interface, or an error.

(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder)

Source from the content-addressed store, hash-verified

547// Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content
548// Returns a watch.Interface, or an error.
549func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) {
550 // We specifically don't want to rate limit watches, so we
551 // don't use r.throttle here.
552 if r.err != nil {
553 return nil, r.err
554 }
555 if r.serializers.Framer == nil {
556 return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
557 }
558
559 url := r.URL().String()
560 req, err := http.NewRequest(r.verb, url, r.body)
561 if err != nil {
562 return nil, err
563 }
564 if r.ctx != nil {
565 req = req.WithContext(r.ctx)
566 }
567 req.Header = r.headers
568 client := r.client
569 if client == nil {
570 client = http.DefaultClient
571 }
572 r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
573 resp, err := client.Do(req)
574 updateURLMetrics(r, resp, err)
575 if r.baseURL != nil {
576 if err != nil {
577 r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
578 } else {
579 r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
580 }
581 }
582 if err != nil {
583 // The watch stream mechanism handles many common partial data errors, so closed
584 // connections can be retried in many cases.
585 if net.IsProbableEOF(err) {
586 return watch.NewEmptyWatch(), nil
587 }
588 return nil, err
589 }
590 if resp.StatusCode != http.StatusOK {
591 defer resp.Body.Close()
592 if result := r.transformResponse(resp, req); result.err != nil {
593 return nil, result.err
594 }
595 return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
596 }
597 wrapperDecoder := wrapperDecoderFn(resp.Body)
598 return watch.NewStreamWatcher(
599 restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder),
600 // use 500 to indicate that the cause of the error is unknown - other error codes
601 // are more specific to HTTP interactions, and set a reason
602 errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
603 ), nil
604}
605
606// updateURLMetrics is a convenience function for pushing metrics.

Callers 2

WatchMethod · 0.95
WatchMethod · 0.80

Calls 10

URLMethod · 0.95
transformResponseMethod · 0.95
updateURLMetricsFunction · 0.85
ErrorfMethod · 0.65
StringMethod · 0.65
SleepMethod · 0.65
CalculateBackoffMethod · 0.65
DoMethod · 0.65
UpdateBackoffMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected