MCPcopy
hub / github.com/nats-io/nats.go / TestPublishAsyncResetPendingOnReconnect

Function TestPublishAsyncResetPendingOnReconnect

jetstream/test/publish_test.go:1703–1770  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1701}
1702
1703func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
1704 s := RunBasicJetStreamServer()
1705
1706 nc, err := nats.Connect(s.ClientURL())
1707 if err != nil {
1708 t.Fatalf("Unexpected error: %v", err)
1709 }
1710
1711 js, err := jetstream.New(nc)
1712 if err != nil {
1713 t.Fatalf("Unexpected error: %v", err)
1714 }
1715 defer nc.Close()
1716 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1717 defer cancel()
1718 _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1719 if err != nil {
1720 t.Fatalf("Unexpected error: %v", err)
1721 }
1722
1723 errs := make(chan error, 1)
1724 done := make(chan struct{}, 1)
1725 acks := make(chan jetstream.PubAckFuture, 100)
1726 wg := sync.WaitGroup{}
1727 go func() {
1728 for i := 0; i < 100; i++ {
1729 if ack, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil {
1730 errs <- err
1731 return
1732 } else {
1733 acks <- ack
1734 }
1735 wg.Add(1)
1736 }
1737 close(acks)
1738 done <- struct{}{}
1739 }()
1740 select {
1741 case <-done:
1742 case err := <-errs:
1743 t.Fatalf("Unexpected error during publish: %v", err)
1744 case <-time.After(5 * time.Second):
1745 t.Fatalf("Did not receive completion signal")
1746 }
1747 for ack := range acks {
1748 go func(paf jetstream.PubAckFuture) {
1749 select {
1750 case <-paf.Ok():
1751 case err := <-paf.Err():
1752 if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
1753 errs <- fmt.Errorf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
1754 }
1755 case <-time.After(5 * time.Second):
1756 errs <- errors.New("Did not receive completion signal")
1757 }
1758 wg.Done()
1759 }(ack)
1760 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
restartBasicJSServerFunction · 0.70
CreateStreamMethod · 0.65
PublishAsyncMethod · 0.65
AddMethod · 0.65
OkMethod · 0.65
ErrMethod · 0.65

Tested by

no test coverage detected