To make sure that canceling a stream with compression enabled won't result in internal error, compressed flag set with identity or empty encoding. The root cause is a select race on stream headerChan and ctx. Stream gets whether compression is enabled and the compression type from two separate func
(t *testing.T)
| 124 | // first one, but `case ctx.Done()` wins the second one, the compression info |
| 125 | // will be inconsistent, and it causes internal error. |
| 126 | func (s) TestCancelWhileRecvingWithCompression(t *testing.T) { |
| 127 | ss := &stubserver.StubServer{ |
| 128 | FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| 129 | for { |
| 130 | if err := stream.Send(&testpb.StreamingOutputCallResponse{ |
| 131 | Payload: nil, |
| 132 | }); err != nil { |
| 133 | return err |
| 134 | } |
| 135 | } |
| 136 | }, |
| 137 | } |
| 138 | if err := ss.Start(nil); err != nil { |
| 139 | t.Fatalf("Error starting endpoint server: %v", err) |
| 140 | } |
| 141 | defer ss.Stop() |
| 142 | |
| 143 | for i := 0; i < 10; i++ { |
| 144 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 145 | s, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor(gzip.Name)) |
| 146 | if err != nil { |
| 147 | t.Fatalf("failed to start bidi streaming RPC: %v", err) |
| 148 | } |
| 149 | // Cancel the stream while receiving to trigger the internal error. |
| 150 | time.AfterFunc(time.Millisecond, cancel) |
| 151 | for { |
| 152 | _, err := s.Recv() |
| 153 | if err != nil { |
| 154 | if status.Code(err) != codes.Canceled { |
| 155 | t.Fatalf("recv failed with %v, want Canceled", err) |
| 156 | } |
| 157 | break |
| 158 | } |
| 159 | } |
| 160 | } |
| 161 | if err := ss.CC.Close(); err != nil { |
| 162 | t.Fatalf("Close failed with %v, want nil", err) |
| 163 | } |
| 164 | } |
nothing calls this directly
no test coverage detected