(t *testing.T)
| 1909 | } |
| 1910 | |
| 1911 | func TestCustomFlusherTimeout(t *testing.T) { |
| 1912 | if runtime.GOOS == "windows" { |
| 1913 | t.SkipNow() |
| 1914 | } |
| 1915 | s := RunDefaultServer() |
| 1916 | defer s.Shutdown() |
| 1917 | |
| 1918 | // Reasonably large flusher timeout will not induce errors |
| 1919 | // when we can flush fast |
| 1920 | nc1, err := nats.Connect(nats.DefaultURL, nats.FlusherTimeout(10*time.Second)) |
| 1921 | if err != nil { |
| 1922 | t.Fatalf("Expected to be able to connect, got: %s", err) |
| 1923 | } |
| 1924 | doneCh := make(chan struct{}, 1) |
| 1925 | // We want to have a payload size that is big enough so that after |
| 1926 | // few publish, the socket buffer will be full and produce the timeout. |
| 1927 | // Since we try to produce the error in the flusher and not the publish |
| 1928 | // call itself, use a size that is a bit less than the internal |
| 1929 | // buffer used by the library. |
| 1930 | payloadBytes := make([]byte, 32*1024-200) |
| 1931 | |
| 1932 | errCh := make(chan error, 1) |
| 1933 | wg := sync.WaitGroup{} |
| 1934 | wg.Add(1) |
| 1935 | go func() { |
| 1936 | defer wg.Done() |
| 1937 | for { |
| 1938 | select { |
| 1939 | case <-time.After(200 * time.Millisecond): |
| 1940 | err := nc1.Publish("hello", payloadBytes) |
| 1941 | if err != nil { |
| 1942 | errCh <- err |
| 1943 | return |
| 1944 | } |
| 1945 | case <-doneCh: |
| 1946 | return |
| 1947 | } |
| 1948 | } |
| 1949 | }() |
| 1950 | defer nc1.Close() |
| 1951 | |
| 1952 | addr := startStalledMockServer(t) |
| 1953 | |
| 1954 | nc2, err := nats.Connect( |
| 1955 | // URL to fake server |
| 1956 | fmt.Sprintf("nats://127.0.0.1:%d", addr.Port), |
| 1957 | // Use custom dialer so we can set write buffer to low value |
| 1958 | nats.SetCustomDialer(&lowWriteBufferDialer{}), |
| 1959 | // Use short flusher timeout to trigger the error |
| 1960 | nats.FlusherTimeout(15*time.Millisecond), |
| 1961 | // Make sure the library does not close connection due |
| 1962 | // to pings for this test. |
| 1963 | nats.PingInterval(20*time.Second), |
| 1964 | // No reconnect |
| 1965 | nats.NoReconnect(), |
| 1966 | // Notify when connection lost |
| 1967 | nats.ClosedHandler(func(_ *nats.Conn) { |
| 1968 | doneCh <- struct{}{} |
nothing calls this directly
no test coverage detected