MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncAckTimeout

Function TestPublishAsyncAckTimeout

jetstream/test/publish_test.go:1926–1987  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1924}
1925
1926func TestPublishAsyncAckTimeout(t *testing.T) {
1927 s := RunBasicJetStreamServer()
1928 defer shutdownJSServerAndRemoveStorage(t, s)
1929
1930 nc, err := nats.Connect(s.ClientURL())
1931 if err != nil {
1932 t.Fatalf("Unexpected error: %v", err)
1933 }
1934
1935 errs := make(chan error, 1)
1936 js, err := jetstream.New(nc,
1937 jetstream.WithPublishAsyncTimeout(50*time.Millisecond),
1938 jetstream.WithPublishAsyncErrHandler(func(js jetstream.JetStream, m *nats.Msg, e error) {
1939 errs <- e
1940 }),
1941 )
1942 if err != nil {
1943 t.Fatalf("Unexpected error: %v", err)
1944 }
1945 defer nc.Close()
1946
1947 _, err = js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}, NoAck: true})
1948 if err != nil {
1949 t.Fatalf("Unexpected error: %v", err)
1950 }
1951
1952 ack, err := js.PublishAsync("FOO.A", []byte("hello"))
1953 if err != nil {
1954 t.Fatalf("Unexpected error: %v", err)
1955 }
1956
1957 select {
1958 case <-ack.Ok():
1959 t.Fatalf("Expected timeout")
1960 case err := <-ack.Err():
1961 if !errors.Is(err, jetstream.ErrAsyncPublishTimeout) {
1962 t.Fatalf("Expected error: %v; got: %v", jetstream.ErrAsyncPublishTimeout, err)
1963 }
1964 case <-time.After(time.Second):
1965 t.Fatalf("Did not receive ack timeout")
1966 }
1967
1968 // check if error callback is called
1969 select {
1970 case err := <-errs:
1971 if !errors.Is(err, jetstream.ErrAsyncPublishTimeout) {
1972 t.Fatalf("Expected error: %v; got: %v", jetstream.ErrAsyncPublishTimeout, err)
1973 }
1974 case <-time.After(time.Second):
1975 t.Fatalf("Did not receive error from error handler")
1976 }
1977
1978 if js.PublishAsyncPending() != 0 {
1979 t.Fatalf("Expected no pending messages")
1980 }
1981
1982 select {
1983 case <-js.PublishAsyncComplete():

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
WithPublishAsyncTimeoutFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishAsyncMethod · 0.65
OkMethod · 0.65
ErrMethod · 0.65
PublishAsyncPendingMethod · 0.65

Tested by

no test coverage detected