NewStream creates a stream and registers it into the transport as "active" streams. All non-nil errors returned will be *NewStreamError.
(ctx context.Context, callHdr *CallHdr, handler stats.Handler)
| 755 | // NewStream creates a stream and registers it into the transport as "active" |
| 756 | // streams. All non-nil errors returned will be *NewStreamError. |
| 757 | func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) (*ClientStream, error) { |
| 758 | ctx = peer.NewContext(ctx, t.Peer()) |
| 759 | |
| 760 | // ServerName field of the resolver returned address takes precedence over |
| 761 | // Host field of CallHdr to determine the :authority header. This is because, |
| 762 | // the ServerName field takes precedence for server authentication during |
| 763 | // TLS handshake, and the :authority header should match the value used |
| 764 | // for server authentication. |
| 765 | if t.address.ServerName != "" { |
| 766 | newCallHdr := *callHdr |
| 767 | newCallHdr.Host = t.address.ServerName |
| 768 | callHdr = &newCallHdr |
| 769 | } |
| 770 | |
| 771 | // The authority specified via the `CallAuthority` CallOption takes the |
| 772 | // highest precedence when determining the `:authority` header. It overrides |
| 773 | // any value present in the Host field of CallHdr. Before applying this |
| 774 | // override, the authority string is validated. If the credentials do not |
| 775 | // implement the AuthorityValidator interface, or if validation fails, the |
| 776 | // RPC is failed with a status code of `UNAVAILABLE`. |
| 777 | if callHdr.Authority != "" { |
| 778 | auth, ok := t.authInfo.(credentials.AuthorityValidator) |
| 779 | if !ok { |
| 780 | return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "credentials type %q does not implement the AuthorityValidator interface, but authority override specified with CallAuthority call option", t.authInfo.AuthType())} |
| 781 | } |
| 782 | if err := auth.ValidateAuthority(callHdr.Authority); err != nil { |
| 783 | return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "failed to validate authority %q : %v", callHdr.Authority, err)} |
| 784 | } |
| 785 | newCallHdr := *callHdr |
| 786 | newCallHdr.Host = callHdr.Authority |
| 787 | callHdr = &newCallHdr |
| 788 | } |
| 789 | |
| 790 | headerFields, err := t.createHeaderFields(ctx, callHdr) |
| 791 | if err != nil { |
| 792 | return nil, &NewStreamError{Err: err, AllowTransparentRetry: false} |
| 793 | } |
| 794 | s := t.newStream(ctx, callHdr, handler) |
| 795 | cleanup := func(err error) { |
| 796 | if s.swapState(streamDone) == streamDone { |
| 797 | // If it was already done, return. |
| 798 | return |
| 799 | } |
| 800 | // The stream was unprocessed by the server. |
| 801 | s.unprocessed.Store(true) |
| 802 | s.write(recvMsg{err: err}) |
| 803 | close(s.done) |
| 804 | // If headerChan isn't closed, then close it. |
| 805 | if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { |
| 806 | close(s.headerChan) |
| 807 | } |
| 808 | } |
| 809 | hdr := &headerFrame{ |
| 810 | hf: headerFields, |
| 811 | endStream: false, |
| 812 | initStream: func(uint32) error { |
| 813 | t.mu.Lock() |
| 814 | // TODO: handle transport closure in loopy instead and remove this |
nothing calls this directly
no test coverage detected