TestProducerStopsBeforeStateChange confirms that producers are stopped before any state change notification is delivered to the LB policy.
(t *testing.T)
| 37 | // TestProducerStopsBeforeStateChange confirms that producers are stopped before |
| 38 | // any state change notification is delivered to the LB policy. |
| 39 | func (s) TestProducerStopsBeforeStateChange(t *testing.T) { |
| 40 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 41 | defer cancel() |
| 42 | |
| 43 | name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "") |
| 44 | var lastProducer *testProducer |
| 45 | bf := stub.BalancerFuncs{ |
| 46 | UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { |
| 47 | var sc balancer.SubConn |
| 48 | sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{ |
| 49 | StateListener: func(scs balancer.SubConnState) { |
| 50 | bd.ClientConn.UpdateState(balancer.State{ |
| 51 | ConnectivityState: scs.ConnectivityState, |
| 52 | // We do not pass a picker, but since we don't perform |
| 53 | // RPCs, that's okay. |
| 54 | }) |
| 55 | if !lastProducer.stopped.Load() { |
| 56 | t.Errorf("lastProducer not stopped before state change notification") |
| 57 | } |
| 58 | t.Logf("State is now %v; recreating producer", scs.ConnectivityState) |
| 59 | p, _ := sc.GetOrBuildProducer(producerBuilderSingleton) |
| 60 | lastProducer = p.(*testProducer) |
| 61 | }, |
| 62 | }) |
| 63 | if err != nil { |
| 64 | return err |
| 65 | } |
| 66 | p, _ := sc.GetOrBuildProducer(producerBuilderSingleton) |
| 67 | lastProducer = p.(*testProducer) |
| 68 | sc.Connect() |
| 69 | return nil |
| 70 | }, |
| 71 | } |
| 72 | stub.Register(name, bf) |
| 73 | |
| 74 | ss := stubserver.StubServer{ |
| 75 | FullDuplexCallF: func(testgrpc.TestService_FullDuplexCallServer) error { |
| 76 | return nil |
| 77 | }, |
| 78 | } |
| 79 | if err := ss.StartServer(); err != nil { |
| 80 | t.Fatal("Error starting server:", err) |
| 81 | } |
| 82 | defer ss.Stop() |
| 83 | |
| 84 | cc, err := grpc.NewClient("dns:///"+ss.Address, |
| 85 | grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`), |
| 86 | grpc.WithTransportCredentials(insecure.NewCredentials()), |
| 87 | ) |
| 88 | if err != nil { |
| 89 | t.Fatalf("Error creating client: %v", err) |
| 90 | } |
| 91 | defer cc.Close() |
| 92 | |
| 93 | go cc.Connect() |
| 94 | testutils.AwaitState(ctx, t, cc, connectivity.Ready) |
| 95 | |
| 96 | cc.Close() |
nothing calls this directly
no test coverage detected