(t *testing.T)
| 1601 | } |
| 1602 | |
| 1603 | func TestJetStreamResetConsumer(t *testing.T) { |
| 1604 | srv := RunBasicJetStreamServer() |
| 1605 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1606 | |
| 1607 | nc, err := nats.Connect(srv.ClientURL()) |
| 1608 | if err != nil { |
| 1609 | t.Fatalf("Unexpected error: %v", err) |
| 1610 | } |
| 1611 | defer nc.Close() |
| 1612 | |
| 1613 | ctx := context.Background() |
| 1614 | js, err := jetstream.New(nc) |
| 1615 | if err != nil { |
| 1616 | t.Fatalf("Unexpected error: %v", err) |
| 1617 | } |
| 1618 | |
| 1619 | _, err = js.CreateStream(ctx, jetstream.StreamConfig{ |
| 1620 | Name: "TEST", |
| 1621 | Subjects: []string{"foo"}, |
| 1622 | }) |
| 1623 | if err != nil { |
| 1624 | t.Fatalf("Unexpected error: %v", err) |
| 1625 | } |
| 1626 | |
| 1627 | for i := range 10 { |
| 1628 | if _, err := js.Publish(ctx, "foo", fmt.Appendf(nil, "msg-%d", i)); err != nil { |
| 1629 | t.Fatalf("Unexpected error: %v", err) |
| 1630 | } |
| 1631 | } |
| 1632 | |
| 1633 | cons, err := js.CreateConsumer(ctx, "TEST", jetstream.ConsumerConfig{ |
| 1634 | Durable: "cons", |
| 1635 | AckPolicy: jetstream.AckExplicitPolicy, |
| 1636 | }) |
| 1637 | if err != nil { |
| 1638 | t.Fatalf("Unexpected error: %v", err) |
| 1639 | } |
| 1640 | |
| 1641 | t.Run("reset to ack floor", func(t *testing.T) { |
| 1642 | // Drain 5 messages without ack so ack_floor stays 0 and the |
| 1643 | // next fetch after reset is observably different from no-op. |
| 1644 | batch, err := cons.Fetch(5) |
| 1645 | if err != nil { |
| 1646 | t.Fatalf("Unexpected error: %v", err) |
| 1647 | } |
| 1648 | for range batch.Messages() { |
| 1649 | } |
| 1650 | |
| 1651 | resp, err := js.ResetConsumer(ctx, "TEST", "cons") |
| 1652 | if err != nil { |
| 1653 | t.Fatalf("Unexpected error: %v", err) |
| 1654 | } |
| 1655 | if resp.ResetSeq != 1 { |
| 1656 | t.Fatalf("Expected ResetSeq=1, got %d", resp.ResetSeq) |
| 1657 | } |
| 1658 | |
| 1659 | next, err := cons.Fetch(1) |
| 1660 | if err != nil { |
nothing calls this directly
no test coverage detected