MCPcopy
hub / github.com/grpc/grpc-go / NewStream

Method NewStream

internal/transport/http2_client.go:757–944  ·  view source on GitHub ↗

NewStream creates a stream and registers it into the transport as "active" streams. All non-nil errors returned will be *NewStreamError.

(ctx context.Context, callHdr *CallHdr, handler stats.Handler)

Source from the content-addressed store, hash-verified

755// NewStream creates a stream and registers it into the transport as "active"
756// streams. All non-nil errors returned will be *NewStreamError.
757func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) (*ClientStream, error) {
758 ctx = peer.NewContext(ctx, t.Peer())
759
760 // ServerName field of the resolver returned address takes precedence over
761 // Host field of CallHdr to determine the :authority header. This is because,
762 // the ServerName field takes precedence for server authentication during
763 // TLS handshake, and the :authority header should match the value used
764 // for server authentication.
765 if t.address.ServerName != "" {
766 newCallHdr := *callHdr
767 newCallHdr.Host = t.address.ServerName
768 callHdr = &newCallHdr
769 }
770
771 // The authority specified via the `CallAuthority` CallOption takes the
772 // highest precedence when determining the `:authority` header. It overrides
773 // any value present in the Host field of CallHdr. Before applying this
774 // override, the authority string is validated. If the credentials do not
775 // implement the AuthorityValidator interface, or if validation fails, the
776 // RPC is failed with a status code of `UNAVAILABLE`.
777 if callHdr.Authority != "" {
778 auth, ok := t.authInfo.(credentials.AuthorityValidator)
779 if !ok {
780 return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "credentials type %q does not implement the AuthorityValidator interface, but authority override specified with CallAuthority call option", t.authInfo.AuthType())}
781 }
782 if err := auth.ValidateAuthority(callHdr.Authority); err != nil {
783 return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "failed to validate authority %q : %v", callHdr.Authority, err)}
784 }
785 newCallHdr := *callHdr
786 newCallHdr.Host = callHdr.Authority
787 callHdr = &newCallHdr
788 }
789
790 headerFields, err := t.createHeaderFields(ctx, callHdr)
791 if err != nil {
792 return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
793 }
794 s := t.newStream(ctx, callHdr, handler)
795 cleanup := func(err error) {
796 if s.swapState(streamDone) == streamDone {
797 // If it was already done, return.
798 return
799 }
800 // The stream was unprocessed by the server.
801 s.unprocessed.Store(true)
802 s.write(recvMsg{err: err})
803 close(s.done)
804 // If headerChan isn't closed, then close it.
805 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
806 close(s.headerChan)
807 }
808 }
809 hdr := &headerFrame{
810 hf: headerFields,
811 endStream: false,
812 initStream: func(uint32) error {
813 t.mu.Lock()
814 // TODO: handle transport closure in loopy instead and remove this

Callers

nothing calls this directly

Calls 15

PeerMethod · 0.95
createHeaderFieldsMethod · 0.95
newStreamMethod · 0.95
writeMethod · 0.95
GracefulCloseMethod · 0.95
NewContextFunction · 0.92
ErrorfFunction · 0.92
IsOnFunction · 0.92
FromOutgoingContextFunction · 0.92
PairsFunction · 0.92
ContextErrFunction · 0.85
swapStateMethod · 0.80

Tested by

no test coverage detected