Test verifies that a client-side cancellation correctly frees up resources on the server. The test setup is designed to simulate a scenario where a server is blocked from sending a large message due to a full client-side flow control window. The client-side cancellation of this blocked RPC then free
(t *testing.T)
| 237 | // up the max concurrent streams quota on the server, allowing a new RPC to be |
| 238 | // created successfully. |
| 239 | func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) { |
| 240 | serverDoneCh := make(chan struct{}, 2) |
| 241 | const flowControlWindowSize = 65535 |
| 242 | ss := &stubserver.StubServer{ |
| 243 | StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { |
| 244 | // Send a large message to exhaust the client's flow control window. |
| 245 | stream.Send(&testpb.StreamingOutputCallResponse{ |
| 246 | Payload: &testpb.Payload{ |
| 247 | Body: make([]byte, flowControlWindowSize+1), |
| 248 | }, |
| 249 | }) |
| 250 | serverDoneCh <- struct{}{} |
| 251 | return nil |
| 252 | }, |
| 253 | } |
| 254 | |
| 255 | // Create a server that allows only 1 stream at a time. |
| 256 | ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1)) |
| 257 | defer ss.Stop() |
| 258 | // Use a static flow control window. |
| 259 | if err := ss.StartClient(grpc.WithStaticStreamWindowSize(flowControlWindowSize)); err != nil { |
| 260 | t.Fatalf("Error while start test service client: %v", err) |
| 261 | } |
| 262 | client := ss.Client |
| 263 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 264 | defer cancel() |
| 265 | |
| 266 | streamCtx, streamCancel := context.WithCancel(ctx) |
| 267 | defer streamCancel() |
| 268 | |
| 269 | if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil { |
| 270 | t.Fatalf("Failed to create server streaming RPC: %v", err) |
| 271 | } |
| 272 | |
| 273 | // Wait for the server handler to return. This should cause the trailers to |
| 274 | // be buffered on the server, waiting for flow control quota to first send |
| 275 | // the data frame. |
| 276 | select { |
| 277 | case <-ctx.Done(): |
| 278 | t.Fatal("Context timed out waiting for server handler to return.") |
| 279 | case <-serverDoneCh: |
| 280 | } |
| 281 | |
| 282 | // Attempt to create a stream. It should fail since the previous stream is |
| 283 | // still blocked. |
| 284 | shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout) |
| 285 | defer shortCancel() |
| 286 | _, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{}) |
| 287 | if status.Code(err) != codes.DeadlineExceeded { |
| 288 | t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded) |
| 289 | } |
| 290 | |
| 291 | // Cancel the RPC, this should free up concurrent stream quota on the |
| 292 | // server. |
| 293 | streamCancel() |
| 294 | |
| 295 | // Attempt to create another stream. |
| 296 | stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) |
nothing calls this directly
no test coverage detected