WatchWithSpecificDecoders attempts to begin watching the requested location with a *different* decoder. Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content Returns a watch.Interface, or an error.
(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder)
| 547 | // Turns out that you want one "standard" decoder for the watch event and one "personal" decoder for the content |
| 548 | // Returns a watch.Interface, or an error. |
| 549 | func (r *Request) WatchWithSpecificDecoders(wrapperDecoderFn func(io.ReadCloser) streaming.Decoder, embeddedDecoder runtime.Decoder) (watch.Interface, error) { |
| 550 | // We specifically don't want to rate limit watches, so we |
| 551 | // don't use r.throttle here. |
| 552 | if r.err != nil { |
| 553 | return nil, r.err |
| 554 | } |
| 555 | if r.serializers.Framer == nil { |
| 556 | return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType) |
| 557 | } |
| 558 | |
| 559 | url := r.URL().String() |
| 560 | req, err := http.NewRequest(r.verb, url, r.body) |
| 561 | if err != nil { |
| 562 | return nil, err |
| 563 | } |
| 564 | if r.ctx != nil { |
| 565 | req = req.WithContext(r.ctx) |
| 566 | } |
| 567 | req.Header = r.headers |
| 568 | client := r.client |
| 569 | if client == nil { |
| 570 | client = http.DefaultClient |
| 571 | } |
| 572 | r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL())) |
| 573 | resp, err := client.Do(req) |
| 574 | updateURLMetrics(r, resp, err) |
| 575 | if r.baseURL != nil { |
| 576 | if err != nil { |
| 577 | r.backoffMgr.UpdateBackoff(r.baseURL, err, 0) |
| 578 | } else { |
| 579 | r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode) |
| 580 | } |
| 581 | } |
| 582 | if err != nil { |
| 583 | // The watch stream mechanism handles many common partial data errors, so closed |
| 584 | // connections can be retried in many cases. |
| 585 | if net.IsProbableEOF(err) { |
| 586 | return watch.NewEmptyWatch(), nil |
| 587 | } |
| 588 | return nil, err |
| 589 | } |
| 590 | if resp.StatusCode != http.StatusOK { |
| 591 | defer resp.Body.Close() |
| 592 | if result := r.transformResponse(resp, req); result.err != nil { |
| 593 | return nil, result.err |
| 594 | } |
| 595 | return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode) |
| 596 | } |
| 597 | wrapperDecoder := wrapperDecoderFn(resp.Body) |
| 598 | return watch.NewStreamWatcher( |
| 599 | restclientwatch.NewDecoder(wrapperDecoder, embeddedDecoder), |
| 600 | // use 500 to indicate that the cause of the error is unknown - other error codes |
| 601 | // are more specific to HTTP interactions, and set a reason |
| 602 | errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"), |
| 603 | ), nil |
| 604 | } |
| 605 | |
| 606 | // updateURLMetrics is a convenience function for pushing metrics. |
no test coverage detected