MCPcopy
hub / github.com/grpc/grpc-go / TestStreamWorkers_RPCsAndStop

Method TestStreamWorkers_RPCsAndStop

server_ext_test.go:111–167  ·  view source on GitHub ↗

Tests the case where the stream worker goroutine option is enabled, and a number of RPCs are initiated around the same time that Stop() is called. This used to result in a write to a closed channel. This test verifies that there is no panic.

(t *testing.T)

Source from the content-addressed store, hash-verified

109// used to result in a write to a closed channel. This test verifies that there
110// is no panic.
111func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
112 ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
113 // This deferred stop takes care of stopping the server when one of the
114 // below grpc.NewClient fail, and the test exits early.
115 defer ss.Stop()
116
117 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
118 defer cancel()
119 const numChannels = 20
120 const numRPCLoops = 20
121
122 // Create a bunch of clientconns and ensure that they are READY by making an
123 // RPC on them.
124 ccs := make([]*grpc.ClientConn, numChannels)
125 for i := 0; i < numChannels; i++ {
126 var err error
127 ccs[i], err = grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
128 if err != nil {
129 t.Fatalf("[iteration: %d] grpc.NewClient(%s) failed: %v", i, ss.Address, err)
130 }
131 defer ccs[i].Close()
132 client := testgrpc.NewTestServiceClient(ccs[i])
133 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
134 t.Fatalf("EmptyCall() failed: %v", err)
135 }
136 }
137
138 // Make a bunch of concurrent RPCs on the above clientconns. These will
139 // eventually race with Stop(), and will start to fail.
140 var wg sync.WaitGroup
141 for i := 0; i < numChannels; i++ {
142 client := testgrpc.NewTestServiceClient(ccs[i])
143 for j := 0; j < numRPCLoops; j++ {
144 wg.Add(1)
145 go func(client testgrpc.TestServiceClient) {
146 defer wg.Done()
147 for {
148 _, err := client.EmptyCall(ctx, &testpb.Empty{})
149 if err == nil {
150 continue
151 }
152 if code := status.Code(err); code == codes.Unavailable {
153 // Once Stop() has been called on the server, we expect
154 // subsequent calls to fail with Unavailable.
155 return
156 }
157 t.Errorf("EmptyCall() failed: %v", err)
158 return
159 }
160 }(client)
161 }
162 }
163
164 // Call Stop() concurrently with the above RPC attempts.
165 ss.Stop()
166 wg.Wait()
167}
168

Callers

nothing calls this directly

Calls 14

EmptyCallMethod · 0.95
StartTestServiceFunction · 0.92
NumStreamWorkersFunction · 0.92
NewClientFunction · 0.92
WithTransportCredentialsFunction · 0.92
NewCredentialsFunction · 0.92
CodeFunction · 0.92
WaitMethod · 0.80
StopMethod · 0.65
FatalfMethod · 0.65
CloseMethod · 0.65
AddMethod · 0.65

Tested by

no test coverage detected