MCPcopy
hub / github.com/grpc/grpc-go / TestStreamCleanupAfterSendStatus

Method TestStreamCleanupAfterSendStatus

test/stream_cleanup_test.go:66–139  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

64}
65
66func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
67 const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
68 const bodySize = 2 * initialWindowSize // Something that is not going to fit in a single window
69
70 serverReturnedStatus := make(chan struct{})
71
72 ss := &stubserver.StubServer{
73 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
74 defer func() {
75 close(serverReturnedStatus)
76 }()
77 return stream.Send(&testpb.StreamingOutputCallResponse{
78 Payload: &testpb.Payload{
79 Body: make([]byte, bodySize),
80 },
81 })
82 },
83 }
84 if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
85 t.Fatalf("Error starting endpoint server: %v", err)
86 }
87 defer ss.Stop()
88
89 // This test makes sure we don't delete stream from server transport's
90 // activeStreams list too aggressively.
91
92 // 1. Make a long living stream RPC. So server's activeStream list is not
93 // empty.
94 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
95 defer cancel()
96 stream, err := ss.Client.FullDuplexCall(ctx)
97 if err != nil {
98 t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err)
99 }
100
101 // 2. Wait for service handler to return status.
102 //
103 // This will trigger a stream cleanup code, which will eventually remove
104 // this stream from activeStream.
105 //
106 // But the stream removal won't happen because it's supposed to be done
107 // after the status is sent by loopyWriter, and the status send is blocked
108 // by flow control.
109 <-serverReturnedStatus
110
111 // 3. GracefulStop (besides sending goaway) checks the number of
112 // activeStreams.
113 //
114 // It will close the connection if there's no active streams. This won't
115 // happen because of the pending stream. But if there's a bug in stream
116 // cleanup that causes stream to be removed too aggressively, the connection
117 // will be closed and the stream will be broken.
118 gracefulStopDone := make(chan struct{})
119 go func() {
120 defer close(gracefulStopDone)
121 ss.S.GracefulStop()
122 }()
123

Callers

nothing calls this directly

Calls 10

StartMethod · 0.95
StopMethod · 0.95
StopMethod · 0.95
WithInitialWindowSizeFunction · 0.92
NewTimerMethod · 0.80
SendMethod · 0.65
FatalfMethod · 0.65
FullDuplexCallMethod · 0.65
GracefulStopMethod · 0.65
RecvMethod · 0.65

Tested by

no test coverage detected