TestGracefulClose ensures that GracefulClose allows in-flight streams to proceed until they complete naturally, while not allowing creation of new streams during this window.
(t *testing.T)
| 1005 | // proceed until they complete naturally, while not allowing creation of new |
| 1006 | // streams during this window. |
| 1007 | func (s) TestGracefulClose(t *testing.T) { |
| 1008 | leakcheck.SetTrackingBufferPool(t) |
| 1009 | server, ct, cancel := setUp(t, 0, pingpong) |
| 1010 | defer cancel() |
| 1011 | defer func() { |
| 1012 | // Stop the server's listener to make the server's goroutines terminate |
| 1013 | // (after the last active stream is done). |
| 1014 | server.lis.Close() |
| 1015 | // Check for goroutine leaks (i.e. GracefulClose with an active stream |
| 1016 | // doesn't eventually close the connection when that stream completes). |
| 1017 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 1018 | defer cancel() |
| 1019 | leakcheck.CheckGoroutines(ctx, t) |
| 1020 | leakcheck.CheckTrackingBufferPool() |
| 1021 | // Correctly clean up the server |
| 1022 | server.stop() |
| 1023 | }() |
| 1024 | ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) |
| 1025 | defer cancel() |
| 1026 | |
| 1027 | // Create a stream that will exist for this whole test and confirm basic |
| 1028 | // functionality. |
| 1029 | s, err := ct.NewStream(ctx, &CallHdr{}, nil) |
| 1030 | if err != nil { |
| 1031 | t.Fatalf("NewStream(_, _) = _, %v, want _, <nil>", err) |
| 1032 | } |
| 1033 | msg := make([]byte, 1024) |
| 1034 | outgoingHeader := make([]byte, 5) |
| 1035 | outgoingHeader[0] = byte(0) |
| 1036 | binary.BigEndian.PutUint32(outgoingHeader[1:], uint32(len(msg))) |
| 1037 | incomingHeader := make([]byte, 5) |
| 1038 | if err := s.Write(outgoingHeader, newBufferSlice(msg), &WriteOptions{}); err != nil { |
| 1039 | t.Fatalf("Error while writing: %v", err) |
| 1040 | } |
| 1041 | if _, err := s.readTo(incomingHeader); err != nil { |
| 1042 | t.Fatalf("Error while reading: %v", err) |
| 1043 | } |
| 1044 | sz := binary.BigEndian.Uint32(incomingHeader[1:]) |
| 1045 | recvMsg := make([]byte, int(sz)) |
| 1046 | if _, err := s.readTo(recvMsg); err != nil { |
| 1047 | t.Fatalf("Error while reading: %v", err) |
| 1048 | } |
| 1049 | |
| 1050 | // Gracefully close the transport, which should not affect the existing |
| 1051 | // stream. |
| 1052 | ct.GracefulClose() |
| 1053 | |
| 1054 | var wg sync.WaitGroup |
| 1055 | // Expect errors creating new streams because the client transport has been |
| 1056 | // gracefully closed. |
| 1057 | for i := 0; i < 200; i++ { |
| 1058 | wg.Add(1) |
| 1059 | go func() { |
| 1060 | defer wg.Done() |
| 1061 | _, err := ct.NewStream(ctx, &CallHdr{}, nil) |
| 1062 | if err != nil && err.(*NewStreamError).Err == ErrConnClosing && err.(*NewStreamError).AllowTransparentRetry { |
| 1063 | return |
| 1064 | } |
nothing calls this directly
no test coverage detected