(t *testing.T)
| 1858 | } |
| 1859 | |
| 1860 | func TestPublishAsyncRetryInErrHandler(t *testing.T) { |
| 1861 | s := RunBasicJetStreamServer() |
| 1862 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1863 | |
| 1864 | nc, err := nats.Connect(s.ClientURL()) |
| 1865 | if err != nil { |
| 1866 | t.Fatalf("Unexpected error: %v", err) |
| 1867 | } |
| 1868 | |
| 1869 | streamCreated := make(chan struct{}) |
| 1870 | errCB := func(js jetstream.JetStream, m *nats.Msg, e error) { |
| 1871 | <-streamCreated |
| 1872 | _, err := js.PublishMsgAsync(m) |
| 1873 | if err != nil { |
| 1874 | t.Fatalf("Unexpected error when republishing: %v", err) |
| 1875 | } |
| 1876 | } |
| 1877 | |
| 1878 | js, err := jetstream.New(nc, jetstream.WithPublishAsyncErrHandler(errCB)) |
| 1879 | if err != nil { |
| 1880 | t.Fatalf("Unexpected error: %v", err) |
| 1881 | } |
| 1882 | defer nc.Close() |
| 1883 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1884 | defer cancel() |
| 1885 | |
| 1886 | errs := make(chan error, 1) |
| 1887 | done := make(chan struct{}, 1) |
| 1888 | go func() { |
| 1889 | for i := 0; i < 10; i++ { |
| 1890 | if _, err := js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithRetryAttempts(0)); err != nil { |
| 1891 | errs <- err |
| 1892 | return |
| 1893 | } |
| 1894 | } |
| 1895 | done <- struct{}{} |
| 1896 | }() |
| 1897 | select { |
| 1898 | case <-done: |
| 1899 | case err := <-errs: |
| 1900 | t.Fatalf("Unexpected error during publish: %v", err) |
| 1901 | case <-time.After(5 * time.Second): |
| 1902 | t.Fatalf("Did not receive completion signal") |
| 1903 | } |
| 1904 | stream, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1905 | if err != nil { |
| 1906 | t.Fatalf("Unexpected error: %v", err) |
| 1907 | } |
| 1908 | |
| 1909 | close(streamCreated) |
| 1910 | select { |
| 1911 | case <-js.PublishAsyncComplete(): |
| 1912 | case <-time.After(5 * time.Second): |
| 1913 | t.Fatalf("Did not receive completion signal") |
| 1914 | } |
| 1915 | |
| 1916 | info, err := stream.Info(context.Background()) |
| 1917 | if err != nil { |
nothing calls this directly
no test coverage detected