MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncRetryInErrHandler

Function TestPublishAsyncRetryInErrHandler

jetstream/test/publish_test.go:1860–1924  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1858}
1859
1860func TestPublishAsyncRetryInErrHandler(t *testing.T) {
1861 s := RunBasicJetStreamServer()
1862 defer shutdownJSServerAndRemoveStorage(t, s)
1863
1864 nc, err := nats.Connect(s.ClientURL())
1865 if err != nil {
1866 t.Fatalf("Unexpected error: %v", err)
1867 }
1868
1869 streamCreated := make(chan struct{})
1870 errCB := func(js jetstream.JetStream, m *nats.Msg, e error) {
1871 <-streamCreated
1872 _, err := js.PublishMsgAsync(m)
1873 if err != nil {
1874 t.Fatalf("Unexpected error when republishing: %v", err)
1875 }
1876 }
1877
1878 js, err := jetstream.New(nc, jetstream.WithPublishAsyncErrHandler(errCB))
1879 if err != nil {
1880 t.Fatalf("Unexpected error: %v", err)
1881 }
1882 defer nc.Close()
1883 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1884 defer cancel()
1885
1886 errs := make(chan error, 1)
1887 done := make(chan struct{}, 1)
1888 go func() {
1889 for i := 0; i < 10; i++ {
1890 if _, err := js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithRetryAttempts(0)); err != nil {
1891 errs <- err
1892 return
1893 }
1894 }
1895 done <- struct{}{}
1896 }()
1897 select {
1898 case <-done:
1899 case err := <-errs:
1900 t.Fatalf("Unexpected error during publish: %v", err)
1901 case <-time.After(5 * time.Second):
1902 t.Fatalf("Did not receive completion signal")
1903 }
1904 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1905 if err != nil {
1906 t.Fatalf("Unexpected error: %v", err)
1907 }
1908
1909 close(streamCreated)
1910 select {
1911 case <-js.PublishAsyncComplete():
1912 case <-time.After(5 * time.Second):
1913 t.Fatalf("Did not receive completion signal")
1914 }
1915
1916 info, err := stream.Info(context.Background())
1917 if err != nil {

Callers

nothing calls this directly

Calls 13

NewFunction · 0.92
WithRetryAttemptsFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
PublishMsgAsyncMethod · 0.65
PublishAsyncMethod · 0.65
CreateStreamMethod · 0.65
PublishAsyncCompleteMethod · 0.65
InfoMethod · 0.65

Tested by

no test coverage detected