TestInflightStreamClosing ensures that closing in-flight stream sends status error to concurrent stream reader.
(t *testing.T)
| 689 | // TestInflightStreamClosing ensures that closing in-flight stream |
| 690 | // sends status error to concurrent stream reader. |
| 691 | func (s) TestInflightStreamClosing(t *testing.T) { |
| 692 | serverConfig := &ServerConfig{ |
| 693 | BufferPool: mem.DefaultBufferPool(), |
| 694 | } |
| 695 | copts := ConnectOptions{ |
| 696 | BufferPool: mem.DefaultBufferPool(), |
| 697 | } |
| 698 | server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, copts) |
| 699 | defer cancel() |
| 700 | defer server.stop() |
| 701 | defer client.Close(fmt.Errorf("closed manually by test")) |
| 702 | |
| 703 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 704 | defer cancel() |
| 705 | stream, err := client.NewStream(ctx, &CallHdr{}, nil) |
| 706 | if err != nil { |
| 707 | t.Fatalf("Client failed to create RPC request: %v", err) |
| 708 | } |
| 709 | |
| 710 | donec := make(chan struct{}) |
| 711 | serr := status.Error(codes.Internal, "client connection is closing") |
| 712 | go func() { |
| 713 | defer close(donec) |
| 714 | if _, err := stream.readTo(make([]byte, defaultWindowSize)); err != serr { |
| 715 | t.Errorf("unexpected Stream error %v, expected %v", err, serr) |
| 716 | } |
| 717 | }() |
| 718 | |
| 719 | // should unblock concurrent stream.Read |
| 720 | stream.Close(serr) |
| 721 | |
| 722 | // wait for stream.Read error |
| 723 | timeout := time.NewTimer(5 * time.Second) |
| 724 | select { |
| 725 | case <-donec: |
| 726 | if !timeout.Stop() { |
| 727 | <-timeout.C |
| 728 | } |
| 729 | case <-timeout.C: |
| 730 | t.Fatalf("Test timed out, expected a status error.") |
| 731 | } |
| 732 | } |
| 733 | |
| 734 | // Tests that when streamID > MaxStreamId, the current client transport drains. |
| 735 | func (s) TestClientTransportDrainsAfterStreamIDExhausted(t *testing.T) { |
nothing calls this directly
no test coverage detected