Build returns a gRPC-based clients.Transport. The Extension field of the ServerIdentifier must be a ServerIdentifierExtension.
(si clients.ServerIdentifier)
| 87 | // |
| 88 | // The Extension field of the ServerIdentifier must be a ServerIdentifierExtension. |
| 89 | func (b *Builder) Build(si clients.ServerIdentifier) (clients.Transport, error) { |
| 90 | if si.ServerURI == "" { |
| 91 | return nil, fmt.Errorf("grpctransport: ServerURI is not set in ServerIdentifier") |
| 92 | } |
| 93 | if si.Extensions == nil { |
| 94 | return nil, fmt.Errorf("grpctransport: Extensions is not set in ServerIdentifier") |
| 95 | } |
| 96 | sce, ok := si.Extensions.(ServerIdentifierExtension) |
| 97 | if !ok { |
| 98 | return nil, fmt.Errorf("grpctransport: Extensions field is %T, but must be %T in ServerIdentifier", si.Extensions, ServerIdentifierExtension{}) |
| 99 | } |
| 100 | |
| 101 | config, ok := b.configs[sce.ConfigName] |
| 102 | if !ok { |
| 103 | return nil, fmt.Errorf("grpctransport: unknown config name %q specified in ServerIdentifierExtension", sce.ConfigName) |
| 104 | } |
| 105 | if config.Credentials == nil { |
| 106 | return nil, fmt.Errorf("grpctransport: config %q has nil credentials bundle", sce.ConfigName) |
| 107 | } |
| 108 | |
| 109 | b.mu.Lock() |
| 110 | defer b.mu.Unlock() |
| 111 | |
| 112 | if cc, ok := b.connections[si]; ok { |
| 113 | if logger.V(2) { |
| 114 | logger.Infof("Reusing existing connection to the server for ServerIdentifier: %v", si) |
| 115 | } |
| 116 | b.refs[si]++ |
| 117 | tr := &grpcTransport{cc: cc} |
| 118 | tr.cleanup = b.cleanupFunc(si, tr) |
| 119 | return tr, nil |
| 120 | } |
| 121 | |
| 122 | // Create a new gRPC client/channel for the server with the provided |
| 123 | // credentials, server URI, and a byte codec to send and receive messages. |
| 124 | // Also set a static keepalive configuration that is common across gRPC |
| 125 | // language implementations. |
| 126 | kpCfg := grpc.WithKeepaliveParams(keepalive.ClientParameters{ |
| 127 | Time: 5 * time.Minute, |
| 128 | Timeout: 20 * time.Second, |
| 129 | }) |
| 130 | dopts := []grpc.DialOption{kpCfg, grpc.WithCredentialsBundle(config.Credentials), grpc.WithDefaultCallOptions(grpc.ForceCodec(&byteCodec{}))} |
| 131 | newClientFunc := grpc.NewClient |
| 132 | if config.GRPCNewClient != nil { |
| 133 | newClientFunc = config.GRPCNewClient |
| 134 | } |
| 135 | cc, err := newClientFunc(si.ServerURI, dopts...) |
| 136 | if err != nil { |
| 137 | return nil, fmt.Errorf("grpctransport: failed to create connection to server %q: %v", si.ServerURI, err) |
| 138 | } |
| 139 | tr := &grpcTransport{cc: cc} |
| 140 | // Register a cleanup function that decrements the refs to the gRPC |
| 141 | // transport each time Close() is called to close it and remove from |
| 142 | // transports and connections map if last reference is being released. |
| 143 | tr.cleanup = b.cleanupFunc(si, tr) |
| 144 | |
| 145 | // Add the newly created connection to the maps to re-use the transport |
| 146 | // channel and track references. |