newControlChannel creates a controlChannel to rlsServerName and uses serviceConfig, if non-empty, as the default service config for the underlying gRPC channel.
(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func())
| 73 | // serviceConfig, if non-empty, as the default service config for the underlying |
| 74 | // gRPC channel. |
| 75 | func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { |
| 76 | ctrlCh := &controlChannel{ |
| 77 | rpcTimeout: rpcTimeout, |
| 78 | backToReadyFunc: backToReadyFunc, |
| 79 | throttler: newAdaptiveThrottler(), |
| 80 | } |
| 81 | ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) |
| 82 | |
| 83 | dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig) |
| 84 | if err != nil { |
| 85 | return nil, err |
| 86 | } |
| 87 | ctrlCh.cc, err = grpc.NewClient(rlsServerName, dopts...) |
| 88 | if err != nil { |
| 89 | return nil, err |
| 90 | } |
| 91 | // Subscribe to connectivity state before connecting to avoid missing initial |
| 92 | // updates, which are only delivered to active subscribers. |
| 93 | subscribe := internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func()) |
| 94 | ctrlCh.dropConnStateSubscriber = subscribe(ctrlCh.cc, newConnectivityStateSubscriber(ctrlCh)) |
| 95 | ctrlCh.cc.Connect() |
| 96 | ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc) |
| 97 | ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName) |
| 98 | return ctrlCh, nil |
| 99 | } |
| 100 | |
| 101 | func (cc *controlChannel) OnMessage(msg any) { |
| 102 | st, ok := msg.(connectivity.State) |