(t *testing.T)
| 1770 | } |
| 1771 | |
| 1772 | func TestPublishAsyncRetry(t *testing.T) { |
| 1773 | tests := []struct { |
| 1774 | name string |
| 1775 | pubOpts []jetstream.PublishOpt |
| 1776 | ackError error |
| 1777 | }{ |
| 1778 | { |
| 1779 | name: "retry until stream is ready", |
| 1780 | pubOpts: []jetstream.PublishOpt{ |
| 1781 | jetstream.WithRetryAttempts(10), |
| 1782 | jetstream.WithRetryWait(100 * time.Millisecond), |
| 1783 | }, |
| 1784 | }, |
| 1785 | { |
| 1786 | name: "fail after max retries", |
| 1787 | pubOpts: []jetstream.PublishOpt{ |
| 1788 | jetstream.WithRetryAttempts(2), |
| 1789 | jetstream.WithRetryWait(50 * time.Millisecond), |
| 1790 | }, |
| 1791 | ackError: jetstream.ErrNoStreamResponse, |
| 1792 | }, |
| 1793 | { |
| 1794 | name: "retries disabled", |
| 1795 | pubOpts: []jetstream.PublishOpt{ |
| 1796 | jetstream.WithRetryAttempts(0), |
| 1797 | }, |
| 1798 | ackError: jetstream.ErrNoStreamResponse, |
| 1799 | }, |
| 1800 | } |
| 1801 | |
| 1802 | for _, test := range tests { |
| 1803 | t.Run(test.name, func(t *testing.T) { |
| 1804 | s := RunBasicJetStreamServer() |
| 1805 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1806 | |
| 1807 | nc, err := nats.Connect(s.ClientURL()) |
| 1808 | if err != nil { |
| 1809 | t.Fatalf("Unexpected error: %v", err) |
| 1810 | } |
| 1811 | |
| 1812 | // set max pending to 1 so that we can test if retries don't cause stall |
| 1813 | js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(1)) |
| 1814 | if err != nil { |
| 1815 | t.Fatalf("Unexpected error: %v", err) |
| 1816 | } |
| 1817 | defer nc.Close() |
| 1818 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1819 | defer cancel() |
| 1820 | |
| 1821 | test.pubOpts = append(test.pubOpts, jetstream.WithStallWait(1*time.Nanosecond)) |
| 1822 | ack, err := js.PublishAsync("foo", []byte("hello"), test.pubOpts...) |
| 1823 | if err != nil { |
| 1824 | t.Fatalf("Unexpected error: %v", err) |
| 1825 | } |
| 1826 | publishComplete := js.PublishAsyncComplete() |
| 1827 | errs := make(chan error, 1) |
| 1828 | go func() { |
| 1829 | // create stream with delay so that publish will receive no responders |
nothing calls this directly
no test coverage detected