TestControlChannelConnectivityStateMonitoring tests the scenario where the control channel goes down and comes back up again and verifies that backoff state is reset for cache entries in this scenario. It also verifies that: - Backoff is NOT reset when the control channel first becomes READY (i.e.,
(t *testing.T)
| 1021 | // the initial CONNECTING → READY transition should not trigger a backoff reset) |
| 1022 | // - Backoff is reset for READY → TRANSIENT_FAILURE → READY transitions |
| 1023 | func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { |
| 1024 | // Create a restartable listener which can close existing connections. |
| 1025 | l, err := testutils.LocalTCPListener() |
| 1026 | if err != nil { |
| 1027 | t.Fatalf("net.Listen() failed: %v", err) |
| 1028 | } |
| 1029 | lis := testutils.NewRestartableListener(l) |
| 1030 | |
| 1031 | // Start an RLS server with the restartable listener and set the throttler to |
| 1032 | // never throttle requests. |
| 1033 | rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) |
| 1034 | overrideAdaptiveThrottler(t, neverThrottlingThrottler()) |
| 1035 | |
| 1036 | // Override the reset backoff hook to get notified. |
| 1037 | resetBackoffDone := make(chan struct{}, 1) |
| 1038 | origResetBackoffHook := resetBackoffHook |
| 1039 | resetBackoffHook = func() { resetBackoffDone <- struct{}{} } |
| 1040 | defer func() { resetBackoffHook = origResetBackoffHook }() |
| 1041 | |
| 1042 | // Override the backoff strategy to return a large backoff which |
| 1043 | // will make sure the date cache entry remains in backoff for the |
| 1044 | // duration of the test. |
| 1045 | origBackoffStrategy := defaultBackoffStrategy |
| 1046 | defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} |
| 1047 | defer func() { defaultBackoffStrategy = origBackoffStrategy }() |
| 1048 | |
| 1049 | // Override the connectivity state subscriber to wrap the original and |
| 1050 | // make connectivity state changes visible to the test. |
| 1051 | wrappedSubscriber := &wrappingConnectivityStateSubscriber{connStateCh: buffer.NewUnbounded()} |
| 1052 | origConnectivityStateSubscriber := newConnectivityStateSubscriber |
| 1053 | newConnectivityStateSubscriber = func(delegate grpcsync.Subscriber) grpcsync.Subscriber { |
| 1054 | return &wrappingConnectivityStateSubscriber{ |
| 1055 | delegate: delegate, |
| 1056 | connStateCh: wrappedSubscriber.connStateCh, |
| 1057 | } |
| 1058 | } |
| 1059 | defer func() { newConnectivityStateSubscriber = origConnectivityStateSubscriber }() |
| 1060 | |
| 1061 | // Register an LB policy to act as the child policy for RLS LB policy. |
| 1062 | childPolicyName := "test-child-policy" + t.Name() |
| 1063 | e2e.RegisterRLSChildPolicy(childPolicyName, nil) |
| 1064 | t.Logf("Registered child policy with name %q", childPolicyName) |
| 1065 | |
| 1066 | // Build RLS service config with header matchers, and a very low value for |
| 1067 | // maxAge to ensure that cache entries become invalid very soon. |
| 1068 | rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address) |
| 1069 | rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout) |
| 1070 | |
| 1071 | // Start a test backend, and set up the fake RLS server to return this as a |
| 1072 | // target in the RLS response. |
| 1073 | backendCh, backendAddress := startBackend(t) |
| 1074 | rlsServer.SetResponseCallback(func(_ context.Context, _ *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { |
| 1075 | return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress}}} |
| 1076 | }) |
| 1077 | |
| 1078 | // Register a manual resolver and push the RLS service config through it. |
| 1079 | r := startManualResolverWithConfig(t, rlsConfig) |
| 1080 |
nothing calls this directly
no test coverage detected