(t *testing.T)
| 1701 | } |
| 1702 | |
| 1703 | func TestPublishAsyncResetPendingOnReconnect(t *testing.T) { |
| 1704 | s := RunBasicJetStreamServer() |
| 1705 | |
| 1706 | nc, err := nats.Connect(s.ClientURL()) |
| 1707 | if err != nil { |
| 1708 | t.Fatalf("Unexpected error: %v", err) |
| 1709 | } |
| 1710 | |
| 1711 | js, err := jetstream.New(nc) |
| 1712 | if err != nil { |
| 1713 | t.Fatalf("Unexpected error: %v", err) |
| 1714 | } |
| 1715 | defer nc.Close() |
| 1716 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1717 | defer cancel() |
| 1718 | _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1719 | if err != nil { |
| 1720 | t.Fatalf("Unexpected error: %v", err) |
| 1721 | } |
| 1722 | |
| 1723 | errs := make(chan error, 1) |
| 1724 | done := make(chan struct{}, 1) |
| 1725 | acks := make(chan jetstream.PubAckFuture, 100) |
| 1726 | wg := sync.WaitGroup{} |
| 1727 | go func() { |
| 1728 | for i := 0; i < 100; i++ { |
| 1729 | if ack, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil { |
| 1730 | errs <- err |
| 1731 | return |
| 1732 | } else { |
| 1733 | acks <- ack |
| 1734 | } |
| 1735 | wg.Add(1) |
| 1736 | } |
| 1737 | close(acks) |
| 1738 | done <- struct{}{} |
| 1739 | }() |
| 1740 | select { |
| 1741 | case <-done: |
| 1742 | case err := <-errs: |
| 1743 | t.Fatalf("Unexpected error during publish: %v", err) |
| 1744 | case <-time.After(5 * time.Second): |
| 1745 | t.Fatalf("Did not receive completion signal") |
| 1746 | } |
| 1747 | for ack := range acks { |
| 1748 | go func(paf jetstream.PubAckFuture) { |
| 1749 | select { |
| 1750 | case <-paf.Ok(): |
| 1751 | case err := <-paf.Err(): |
| 1752 | if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) { |
| 1753 | errs <- fmt.Errorf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err) |
| 1754 | } |
| 1755 | case <-time.After(5 * time.Second): |
| 1756 | errs <- errors.New("Did not receive completion signal") |
| 1757 | } |
| 1758 | wg.Done() |
| 1759 | }(ack) |
| 1760 | } |
nothing calls this directly
no test coverage detected