startStalledMockServer starts a fake NATS server on an ephemeral port that completes the INFO/CONNECT/PING handshake and then stops reading from the socket, so client writes eventually stall.
(t *testing.T)
| 2038 | // that completes the INFO/CONNECT/PING handshake and then stops reading |
| 2039 | // from the socket, so client writes eventually stall. |
| 2040 | func startStalledMockServer(t *testing.T) *net.TCPAddr { |
| 2041 | t.Helper() |
| 2042 | l, err := net.Listen("tcp", "127.0.0.1:0") |
| 2043 | if err != nil { |
| 2044 | t.Fatalf("Could not listen on an ephemeral port: %v", err) |
| 2045 | } |
| 2046 | addr := l.Addr().(*net.TCPAddr) |
| 2047 | |
| 2048 | done := make(chan struct{}) |
| 2049 | exited := make(chan struct{}) |
| 2050 | |
| 2051 | go func() { |
| 2052 | defer close(exited) |
| 2053 | conn, err := l.Accept() |
| 2054 | if err != nil { |
| 2055 | return |
| 2056 | } |
| 2057 | defer conn.Close() |
| 2058 | if err := conn.(*net.TCPConn).SetReadBuffer(1024); err != nil { |
| 2059 | t.Errorf("Expected SetReadBuffer to succeed, got: %v", err) |
| 2060 | return |
| 2061 | } |
| 2062 | info := fmt.Sprintf( |
| 2063 | "INFO {\"server_id\":\"foobar\",\"host\":\"%s\",\"port\":%d,\"auth_required\":false,\"tls_required\":false,\"max_payload\":%d}\r\n", |
| 2064 | addr.IP, addr.Port, 1024*1024, |
| 2065 | ) |
| 2066 | conn.Write([]byte(info)) |
| 2067 | line := make([]byte, 100) |
| 2068 | if _, err := conn.Read(line); err != nil { |
| 2069 | t.Errorf("Expected CONNECT+PING, got: %v", err) |
| 2070 | return |
| 2071 | } |
| 2072 | conn.Write([]byte("PONG\r\n")) |
| 2073 | <-done |
| 2074 | }() |
| 2075 | |
| 2076 | t.Cleanup(func() { |
| 2077 | close(done) |
| 2078 | l.Close() |
| 2079 | <-exited |
| 2080 | }) |
| 2081 | return addr |
| 2082 | } |
| 2083 | |
| 2084 | func TestReconnectOnFlusherError(t *testing.T) { |
| 2085 | if runtime.GOOS == "windows" { |
no test coverage detected