MCPcopy
hub / github.com/grpc/grpc-go / TestServerStreaming_ClientCallSendMsgTwice

Method TestServerStreaming_ClientCallSendMsgTwice

test/end2end_test.go:3952–4019  ·  view source on GitHub ↗

Tests the behavior for server-side streaming when client calls SendMsg twice. Second call to SendMsg should fail with Internal error and result in closing the connection with a RST_STREAM.

(t *testing.T)

Source from the content-addressed store, hash-verified

3950// Second call to SendMsg should fail with Internal error and result in closing
3951// the connection with a RST_STREAM.
3952func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
3953 // To ensure initial call to server.recvMsg() made by the generated code is successfully
3954 // completed. Otherwise, if the client attempts to send a second request message, that
3955 // will trigger a RST_STREAM from the client due to the application violating the RPC's
3956 // protocol. The RST_STREAM could cause the server’s first RecvMsg to fail and will prevent
3957 // the method handler from being called.
3958 recvDoneOnServer := make(chan struct{})
3959 // To ensure goroutine for test does not end before RPC handler performs error
3960 // checking.
3961 handlerDone := make(chan struct{})
3962 ss := stubserver.StubServer{
3963 StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
3964 close(recvDoneOnServer)
3965 // Block until the stream’s context is done. Second call to client.SendMsg
3966 // triggers a RST_STREAM which cancels the stream context on the server.
3967 <-stream.Context().Done()
3968 var err error
3969 waitCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3970 defer cancel()
3971 for ; waitCtx.Err() == nil; <-time.After(time.Millisecond) {
3972 if err = stream.SendMsg(&testpb.StreamingOutputCallRequest{}); err != nil {
3973 break
3974 }
3975 }
3976 if waitCtx.Err() != nil {
3977 t.Fatalf("Context timed out waiting for stream.SendMsg() to return an error")
3978 }
3979 if status.Code(err) != codes.Canceled {
3980 t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled)
3981 }
3982 close(handlerDone)
3983 return nil
3984 },
3985 }
3986 if err := ss.Start(nil); err != nil {
3987 t.Fatal("Error starting server:", err)
3988 }
3989 defer ss.Stop()
3990
3991 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3992 defer cancel()
3993 cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
3994 if err != nil {
3995 t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
3996 }
3997 defer cc.Close()
3998
3999 desc := &grpc.StreamDesc{
4000 StreamName: "StreamingOutputCall",
4001 ServerStreams: true,
4002 ClientStreams: false,
4003 }
4004
4005 stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
4006 if err != nil {
4007 t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
4008 }
4009

Callers

nothing calls this directly

Calls 15

ContextMethod · 0.95
SendMsgMethod · 0.95
StartMethod · 0.95
StopMethod · 0.95
NewClientFunction · 0.92
WithTransportCredentialsFunction · 0.92
NewCredentialsFunction · 0.92
ErrMethod · 0.80
CodeMethod · 0.80
FatalfMethod · 0.65
ErrorfMethod · 0.65
FatalMethod · 0.65

Tested by

no test coverage detected