MCPcopy
hub / github.com/grpc/grpc-go / TestGracefulClose

Method TestGracefulClose

internal/transport/transport_test.go:1007–1075  ·  view source on GitHub ↗

TestGracefulClose ensures that GracefulClose allows in-flight streams to proceed until they complete naturally, while not allowing creation of new streams during this window.

(t *testing.T)

Source from the content-addressed store, hash-verified

1005// proceed until they complete naturally, while not allowing creation of new
1006// streams during this window.
1007func (s) TestGracefulClose(t *testing.T) {
1008 leakcheck.SetTrackingBufferPool(t)
1009 server, ct, cancel := setUp(t, 0, pingpong)
1010 defer cancel()
1011 defer func() {
1012 // Stop the server's listener to make the server's goroutines terminate
1013 // (after the last active stream is done).
1014 server.lis.Close()
1015 // Check for goroutine leaks (i.e. GracefulClose with an active stream
1016 // doesn't eventually close the connection when that stream completes).
1017 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1018 defer cancel()
1019 leakcheck.CheckGoroutines(ctx, t)
1020 leakcheck.CheckTrackingBufferPool()
1021 // Correctly clean up the server
1022 server.stop()
1023 }()
1024 ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
1025 defer cancel()
1026
1027 // Create a stream that will exist for this whole test and confirm basic
1028 // functionality.
1029 s, err := ct.NewStream(ctx, &CallHdr{}, nil)
1030 if err != nil {
1031 t.Fatalf("NewStream(_, _) = _, %v, want _, <nil>", err)
1032 }
1033 msg := make([]byte, 1024)
1034 outgoingHeader := make([]byte, 5)
1035 outgoingHeader[0] = byte(0)
1036 binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(len(msg)))
1037 incomingHeader := make([]byte, 5)
1038 if err := s.Write(outgoingHeader, newBufferSlice(msg), &WriteOptions{}); err != nil {
1039 t.Fatalf("Error while writing: %v", err)
1040 }
1041 if _, err := s.readTo(incomingHeader); err != nil {
1042 t.Fatalf("Error while reading: %v", err)
1043 }
1044 sz := binary.BigEndian.Uint32(incomingHeader[1:])
1045 recvMsg := make([]byte, int(sz))
1046 if _, err := s.readTo(recvMsg); err != nil {
1047 t.Fatalf("Error while reading: %v", err)
1048 }
1049
1050 // Gracefully close the transport, which should not affect the existing
1051 // stream.
1052 ct.GracefulClose()
1053
1054 var wg sync.WaitGroup
1055 // Expect errors creating new streams because the client transport has been
1056 // gracefully closed.
1057 for i := 0; i < 200; i++ {
1058 wg.Add(1)
1059 go func() {
1060 defer wg.Done()
1061 _, err := ct.NewStream(ctx, &CallHdr{}, nil)
1062 if err != nil && err.(*NewStreamError).Err == ErrConnClosing && err.(*NewStreamError).AllowTransparentRetry {
1063 return
1064 }

Callers

nothing calls this directly

Calls 15

SetTrackingBufferPoolFunction · 0.92
CheckGoroutinesFunction · 0.92
CheckTrackingBufferPoolFunction · 0.92
setUpFunction · 0.85
newBufferSliceFunction · 0.85
readToMethod · 0.80
WaitMethod · 0.80
CloseMethod · 0.65
stopMethod · 0.65
NewStreamMethod · 0.65
FatalfMethod · 0.65
WriteMethod · 0.65

Tested by

no test coverage detected