(t *testing.T)
| 1745 | } |
| 1746 | |
| 1747 | func TestStreamResetConsumer(t *testing.T) { |
| 1748 | srv := RunBasicJetStreamServer() |
| 1749 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1750 | |
| 1751 | nc, err := nats.Connect(srv.ClientURL()) |
| 1752 | if err != nil { |
| 1753 | t.Fatalf("Unexpected error: %v", err) |
| 1754 | } |
| 1755 | defer nc.Close() |
| 1756 | |
| 1757 | js, err := jetstream.New(nc) |
| 1758 | if err != nil { |
| 1759 | t.Fatalf("Unexpected error: %v", err) |
| 1760 | } |
| 1761 | |
| 1762 | ctx := context.Background() |
| 1763 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{ |
| 1764 | Name: "TEST", |
| 1765 | Subjects: []string{"foo"}, |
| 1766 | }) |
| 1767 | if err != nil { |
| 1768 | t.Fatalf("Unexpected error: %v", err) |
| 1769 | } |
| 1770 | |
| 1771 | for i := range 10 { |
| 1772 | if _, err := js.Publish(ctx, "foo", fmt.Appendf(nil, "msg-%d", i)); err != nil { |
| 1773 | t.Fatalf("Unexpected error: %v", err) |
| 1774 | } |
| 1775 | } |
| 1776 | |
| 1777 | cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ |
| 1778 | Durable: "cons", |
| 1779 | AckPolicy: jetstream.AckExplicitPolicy, |
| 1780 | }) |
| 1781 | if err != nil { |
| 1782 | t.Fatalf("Unexpected error: %v", err) |
| 1783 | } |
| 1784 | |
| 1785 | // Drain 5 messages without ack so ack_floor stays 0. |
| 1786 | batch, err := cons.Fetch(5) |
| 1787 | if err != nil { |
| 1788 | t.Fatalf("Unexpected error: %v", err) |
| 1789 | } |
| 1790 | count := 0 |
| 1791 | for range batch.Messages() { |
| 1792 | count++ |
| 1793 | } |
| 1794 | if count != 5 { |
| 1795 | t.Fatalf("Expected 5 messages, got %d", count) |
| 1796 | } |
| 1797 | |
| 1798 | resp, err := s.ResetConsumer(ctx, "cons") |
| 1799 | if err != nil { |
| 1800 | t.Fatalf("Unexpected error: %v", err) |
| 1801 | } |
| 1802 | if resp.ResetSeq != 1 { |
| 1803 | t.Fatalf("Expected ResetSeq=1, got %d", resp.ResetSeq) |
| 1804 | } |
nothing calls this directly
no test coverage detected