MCPcopy
hub / github.com/nats-io/nats.go / TestStreamResetConsumer

Function TestStreamResetConsumer

jetstream/test/stream_test.go:1747–1827  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1745}
1746
1747func TestStreamResetConsumer(t *testing.T) {
1748 srv := RunBasicJetStreamServer()
1749 defer shutdownJSServerAndRemoveStorage(t, srv)
1750
1751 nc, err := nats.Connect(srv.ClientURL())
1752 if err != nil {
1753 t.Fatalf("Unexpected error: %v", err)
1754 }
1755 defer nc.Close()
1756
1757 js, err := jetstream.New(nc)
1758 if err != nil {
1759 t.Fatalf("Unexpected error: %v", err)
1760 }
1761
1762 ctx := context.Background()
1763 s, err := js.CreateStream(ctx, jetstream.StreamConfig{
1764 Name: "TEST",
1765 Subjects: []string{"foo"},
1766 })
1767 if err != nil {
1768 t.Fatalf("Unexpected error: %v", err)
1769 }
1770
1771 for i := range 10 {
1772 if _, err := js.Publish(ctx, "foo", fmt.Appendf(nil, "msg-%d", i)); err != nil {
1773 t.Fatalf("Unexpected error: %v", err)
1774 }
1775 }
1776
1777 cons, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
1778 Durable: "cons",
1779 AckPolicy: jetstream.AckExplicitPolicy,
1780 })
1781 if err != nil {
1782 t.Fatalf("Unexpected error: %v", err)
1783 }
1784
1785 // Drain 5 messages without ack so ack_floor stays 0.
1786 batch, err := cons.Fetch(5)
1787 if err != nil {
1788 t.Fatalf("Unexpected error: %v", err)
1789 }
1790 count := 0
1791 for range batch.Messages() {
1792 count++
1793 }
1794 if count != 5 {
1795 t.Fatalf("Expected 5 messages, got %d", count)
1796 }
1797
1798 resp, err := s.ResetConsumer(ctx, "cons")
1799 if err != nil {
1800 t.Fatalf("Unexpected error: %v", err)
1801 }
1802 if resp.ResetSeq != 1 {
1803 t.Fatalf("Expected ResetSeq=1, got %d", resp.ResetSeq)
1804 }

Callers

nothing calls this directly

Calls 13

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
FetchMethod · 0.65
MessagesMethod · 0.65
ResetConsumerMethod · 0.65
MetadataMethod · 0.65

Tested by

no test coverage detected