MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncRetry

Function TestPublishAsyncRetry

jetstream/test/publish_test.go:1772–1858  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1770}
1771
1772func TestPublishAsyncRetry(t *testing.T) {
1773 tests := []struct {
1774 name string
1775 pubOpts []jetstream.PublishOpt
1776 ackError error
1777 }{
1778 {
1779 name: "retry until stream is ready",
1780 pubOpts: []jetstream.PublishOpt{
1781 jetstream.WithRetryAttempts(10),
1782 jetstream.WithRetryWait(100 * time.Millisecond),
1783 },
1784 },
1785 {
1786 name: "fail after max retries",
1787 pubOpts: []jetstream.PublishOpt{
1788 jetstream.WithRetryAttempts(2),
1789 jetstream.WithRetryWait(50 * time.Millisecond),
1790 },
1791 ackError: jetstream.ErrNoStreamResponse,
1792 },
1793 {
1794 name: "retries disabled",
1795 pubOpts: []jetstream.PublishOpt{
1796 jetstream.WithRetryAttempts(0),
1797 },
1798 ackError: jetstream.ErrNoStreamResponse,
1799 },
1800 }
1801
1802 for _, test := range tests {
1803 t.Run(test.name, func(t *testing.T) {
1804 s := RunBasicJetStreamServer()
1805 defer shutdownJSServerAndRemoveStorage(t, s)
1806
1807 nc, err := nats.Connect(s.ClientURL())
1808 if err != nil {
1809 t.Fatalf("Unexpected error: %v", err)
1810 }
1811
1812 // set max pending to 1 so that we can test if retries don't cause stall
1813 js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(1))
1814 if err != nil {
1815 t.Fatalf("Unexpected error: %v", err)
1816 }
1817 defer nc.Close()
1818 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1819 defer cancel()
1820
1821 test.pubOpts = append(test.pubOpts, jetstream.WithStallWait(1*time.Nanosecond))
1822 ack, err := js.PublishAsync("foo", []byte("hello"), test.pubOpts...)
1823 if err != nil {
1824 t.Fatalf("Unexpected error: %v", err)
1825 }
1826 publishComplete := js.PublishAsyncComplete()
1827 errs := make(chan error, 1)
1828 go func() {
1829 // create stream with delay so that publish will receive no responders

Callers

nothing calls this directly

Calls 15

WithRetryAttemptsFunction · 0.92
WithRetryWaitFunction · 0.92
NewFunction · 0.92
WithStallWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
PublishAsyncMethod · 0.65
PublishAsyncCompleteMethod · 0.65
CreateStreamMethod · 0.65

Tested by

no test coverage detected