(t *testing.T)
| 64 | } |
| 65 | |
| 66 | func (s) TestStreamCleanupAfterSendStatus(t *testing.T) { |
| 67 | const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise |
| 68 | const bodySize = 2 * initialWindowSize // Something that is not going to fit in a single window |
| 69 | |
| 70 | serverReturnedStatus := make(chan struct{}) |
| 71 | |
| 72 | ss := &stubserver.StubServer{ |
| 73 | FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| 74 | defer func() { |
| 75 | close(serverReturnedStatus) |
| 76 | }() |
| 77 | return stream.Send(&testpb.StreamingOutputCallResponse{ |
| 78 | Payload: &testpb.Payload{ |
| 79 | Body: make([]byte, bodySize), |
| 80 | }, |
| 81 | }) |
| 82 | }, |
| 83 | } |
| 84 | if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil { |
| 85 | t.Fatalf("Error starting endpoint server: %v", err) |
| 86 | } |
| 87 | defer ss.Stop() |
| 88 | |
| 89 | // This test makes sure we don't delete stream from server transport's |
| 90 | // activeStreams list too aggressively. |
| 91 | |
| 92 | // 1. Make a long living stream RPC. So server's activeStream list is not |
| 93 | // empty. |
| 94 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 95 | defer cancel() |
| 96 | stream, err := ss.Client.FullDuplexCall(ctx) |
| 97 | if err != nil { |
| 98 | t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err) |
| 99 | } |
| 100 | |
| 101 | // 2. Wait for service handler to return status. |
| 102 | // |
| 103 | // This will trigger a stream cleanup code, which will eventually remove |
| 104 | // this stream from activeStream. |
| 105 | // |
| 106 | // But the stream removal won't happen because it's supposed to be done |
| 107 | // after the status is sent by loopyWriter, and the status send is blocked |
| 108 | // by flow control. |
| 109 | <-serverReturnedStatus |
| 110 | |
| 111 | // 3. GracefulStop (besides sending goaway) checks the number of |
| 112 | // activeStreams. |
| 113 | // |
| 114 | // It will close the connection if there's no active streams. This won't |
| 115 | // happen because of the pending stream. But if there's a bug in stream |
| 116 | // cleanup that causes stream to be removed too aggressively, the connection |
| 117 | // will be closed and the stream will be broken. |
| 118 | gracefulStopDone := make(chan struct{}) |
| 119 | go func() { |
| 120 | defer close(gracefulStopDone) |
| 121 | ss.S.GracefulStop() |
| 122 | }() |
| 123 |
nothing calls this directly
no test coverage detected