MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamResetConsumer

Function TestJetStreamResetConsumer

jetstream/test/consumer_test.go:1603–1718  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1601}
1602
1603func TestJetStreamResetConsumer(t *testing.T) {
1604 srv := RunBasicJetStreamServer()
1605 defer shutdownJSServerAndRemoveStorage(t, srv)
1606
1607 nc, err := nats.Connect(srv.ClientURL())
1608 if err != nil {
1609 t.Fatalf("Unexpected error: %v", err)
1610 }
1611 defer nc.Close()
1612
1613 ctx := context.Background()
1614 js, err := jetstream.New(nc)
1615 if err != nil {
1616 t.Fatalf("Unexpected error: %v", err)
1617 }
1618
1619 _, err = js.CreateStream(ctx, jetstream.StreamConfig{
1620 Name: "TEST",
1621 Subjects: []string{"foo"},
1622 })
1623 if err != nil {
1624 t.Fatalf("Unexpected error: %v", err)
1625 }
1626
1627 for i := range 10 {
1628 if _, err := js.Publish(ctx, "foo", fmt.Appendf(nil, "msg-%d", i)); err != nil {
1629 t.Fatalf("Unexpected error: %v", err)
1630 }
1631 }
1632
1633 cons, err := js.CreateConsumer(ctx, "TEST", jetstream.ConsumerConfig{
1634 Durable: "cons",
1635 AckPolicy: jetstream.AckExplicitPolicy,
1636 })
1637 if err != nil {
1638 t.Fatalf("Unexpected error: %v", err)
1639 }
1640
1641 t.Run("reset to ack floor", func(t *testing.T) {
1642 // Drain 5 messages without ack so ack_floor stays 0 and the
1643 // next fetch after reset is observably different from no-op.
1644 batch, err := cons.Fetch(5)
1645 if err != nil {
1646 t.Fatalf("Unexpected error: %v", err)
1647 }
1648 for range batch.Messages() {
1649 }
1650
1651 resp, err := js.ResetConsumer(ctx, "TEST", "cons")
1652 if err != nil {
1653 t.Fatalf("Unexpected error: %v", err)
1654 }
1655 if resp.ResetSeq != 1 {
1656 t.Fatalf("Expected ResetSeq=1, got %d", resp.ResetSeq)
1657 }
1658
1659 next, err := cons.Fetch(1)
1660 if err != nil {

Callers

nothing calls this directly

Calls 14

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
CreateConsumerMethod · 0.65
FetchMethod · 0.65
MessagesMethod · 0.65
ResetConsumerMethod · 0.65
MetadataMethod · 0.65

Tested by

no test coverage detected