Tests the case where the stream worker goroutine option is enabled, and a number of RPCs are initiated around the same time that Stop() is called. This used to result in a write to a closed channel. This test verifies that there is no panic.
(t *testing.T)
| 109 | // used to result in a write to a closed channel. This test verifies that there |
| 110 | // is no panic. |
| 111 | func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) { |
| 112 | ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU()))) |
| 113 | // This deferred stop takes care of stopping the server when one of the |
| 114 | // below grpc.NewClient fail, and the test exits early. |
| 115 | defer ss.Stop() |
| 116 | |
| 117 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 118 | defer cancel() |
| 119 | const numChannels = 20 |
| 120 | const numRPCLoops = 20 |
| 121 | |
| 122 | // Create a bunch of clientconns and ensure that they are READY by making an |
| 123 | // RPC on them. |
| 124 | ccs := make([]*grpc.ClientConn, numChannels) |
| 125 | for i := 0; i < numChannels; i++ { |
| 126 | var err error |
| 127 | ccs[i], err = grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) |
| 128 | if err != nil { |
| 129 | t.Fatalf("[iteration: %d] grpc.NewClient(%s) failed: %v", i, ss.Address, err) |
| 130 | } |
| 131 | defer ccs[i].Close() |
| 132 | client := testgrpc.NewTestServiceClient(ccs[i]) |
| 133 | if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { |
| 134 | t.Fatalf("EmptyCall() failed: %v", err) |
| 135 | } |
| 136 | } |
| 137 | |
| 138 | // Make a bunch of concurrent RPCs on the above clientconns. These will |
| 139 | // eventually race with Stop(), and will start to fail. |
| 140 | var wg sync.WaitGroup |
| 141 | for i := 0; i < numChannels; i++ { |
| 142 | client := testgrpc.NewTestServiceClient(ccs[i]) |
| 143 | for j := 0; j < numRPCLoops; j++ { |
| 144 | wg.Add(1) |
| 145 | go func(client testgrpc.TestServiceClient) { |
| 146 | defer wg.Done() |
| 147 | for { |
| 148 | _, err := client.EmptyCall(ctx, &testpb.Empty{}) |
| 149 | if err == nil { |
| 150 | continue |
| 151 | } |
| 152 | if code := status.Code(err); code == codes.Unavailable { |
| 153 | // Once Stop() has been called on the server, we expect |
| 154 | // subsequent calls to fail with Unavailable. |
| 155 | return |
| 156 | } |
| 157 | t.Errorf("EmptyCall() failed: %v", err) |
| 158 | return |
| 159 | } |
| 160 | }(client) |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | // Call Stop() concurrently with the above RPC attempts. |
| 165 | ss.Stop() |
| 166 | wg.Wait() |
| 167 | } |
| 168 |
nothing calls this directly
no test coverage detected