MCPcopy
hub / github.com/grpc/grpc-go / TestCancelWhileServerWaitingForFlowControl

Method TestCancelWhileServerWaitingForFlowControl

test/transport_test.go:239–304  ·  view source on GitHub ↗

Test verifies that a client-side cancellation correctly frees up resources on the server. The test setup is designed to simulate a scenario where a server is blocked from sending a large message due to a full client-side flow control window. The client-side cancellation of this blocked RPC then free

(t *testing.T)

Source from the content-addressed store, hash-verified

237// up the max concurrent streams quota on the server, allowing a new RPC to be
238// created successfully.
239func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) {
240 serverDoneCh := make(chan struct{}, 2)
241 const flowControlWindowSize = 65535
242 ss := &stubserver.StubServer{
243 StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
244 // Send a large message to exhaust the client's flow control window.
245 stream.Send(&testpb.StreamingOutputCallResponse{
246 Payload: &testpb.Payload{
247 Body: make([]byte, flowControlWindowSize+1),
248 },
249 })
250 serverDoneCh <- struct{}{}
251 return nil
252 },
253 }
254
255 // Create a server that allows only 1 stream at a time.
256 ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1))
257 defer ss.Stop()
258 // Use a static flow control window.
259 if err := ss.StartClient(grpc.WithStaticStreamWindowSize(flowControlWindowSize)); err != nil {
260 t.Fatalf("Error while start test service client: %v", err)
261 }
262 client := ss.Client
263 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
264 defer cancel()
265
266 streamCtx, streamCancel := context.WithCancel(ctx)
267 defer streamCancel()
268
269 if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil {
270 t.Fatalf("Failed to create server streaming RPC: %v", err)
271 }
272
273 // Wait for the server handler to return. This should cause the trailers to
274 // be buffered on the server, waiting for flow control quota to first send
275 // the data frame.
276 select {
277 case <-ctx.Done():
278 t.Fatal("Context timed out waiting for server handler to return.")
279 case <-serverDoneCh:
280 }
281
282 // Attempt to create a stream. It should fail since the previous stream is
283 // still blocked.
284 shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
285 defer shortCancel()
286 _, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{})
287 if status.Code(err) != codes.DeadlineExceeded {
288 t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded)
289 }
290
291 // Cancel the RPC, this should free up concurrent stream quota on the
292 // server.
293 streamCancel()
294
295 // Attempt to create another stream.
296 stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})

Callers

nothing calls this directly

Calls 12

StopMethod · 0.95
StartClientMethod · 0.95
StartTestServiceFunction · 0.92
MaxConcurrentStreamsFunction · 0.92
CodeFunction · 0.92
SendMethod · 0.65
FatalfMethod · 0.65
StreamingOutputCallMethod · 0.65
FatalMethod · 0.65
RecvMethod · 0.65
DoneMethod · 0.45

Tested by

no test coverage detected