(t *testing.T)
| 1924 | } |
| 1925 | |
| 1926 | func TestPublishAsyncAckTimeout(t *testing.T) { |
| 1927 | s := RunBasicJetStreamServer() |
| 1928 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1929 | |
| 1930 | nc, err := nats.Connect(s.ClientURL()) |
| 1931 | if err != nil { |
| 1932 | t.Fatalf("Unexpected error: %v", err) |
| 1933 | } |
| 1934 | |
| 1935 | errs := make(chan error, 1) |
| 1936 | js, err := jetstream.New(nc, |
| 1937 | jetstream.WithPublishAsyncTimeout(50*time.Millisecond), |
| 1938 | jetstream.WithPublishAsyncErrHandler(func(js jetstream.JetStream, m *nats.Msg, e error) { |
| 1939 | errs <- e |
| 1940 | }), |
| 1941 | ) |
| 1942 | if err != nil { |
| 1943 | t.Fatalf("Unexpected error: %v", err) |
| 1944 | } |
| 1945 | defer nc.Close() |
| 1946 | |
| 1947 | _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true}) |
| 1948 | if err != nil { |
| 1949 | t.Fatalf("Unexpected error: %v", err) |
| 1950 | } |
| 1951 | |
| 1952 | ack, err := js.PublishAsync("FOO.A", []byte("hello")) |
| 1953 | if err != nil { |
| 1954 | t.Fatalf("Unexpected error: %v", err) |
| 1955 | } |
| 1956 | |
| 1957 | select { |
| 1958 | case <-ack.Ok(): |
| 1959 | t.Fatalf("Expected timeout") |
| 1960 | case err := <-ack.Err(): |
| 1961 | if !errors.Is(err, jetstream.ErrAsyncPublishTimeout) { |
| 1962 | t.Fatalf("Expected error: %v; got: %v", jetstream.ErrAsyncPublishTimeout, err) |
| 1963 | } |
| 1964 | case <-time.After(time.Second): |
| 1965 | t.Fatalf("Did not receive ack timeout") |
| 1966 | } |
| 1967 | |
| 1968 | // check if error callback is called |
| 1969 | select { |
| 1970 | case err := <-errs: |
| 1971 | if !errors.Is(err, jetstream.ErrAsyncPublishTimeout) { |
| 1972 | t.Fatalf("Expected error: %v; got: %v", jetstream.ErrAsyncPublishTimeout, err) |
| 1973 | } |
| 1974 | case <-time.After(time.Second): |
| 1975 | t.Fatalf("Did not receive error from error handler") |
| 1976 | } |
| 1977 | |
| 1978 | if js.PublishAsyncPending() != 0 { |
| 1979 | t.Fatalf("Expected no pending messages") |
| 1980 | } |
| 1981 | |
| 1982 | select { |
| 1983 | case <-js.PublishAsyncComplete(): |
nothing calls this directly
no test coverage detected