(t *testing.T)
| 4682 | } |
| 4683 | |
| 4684 | func TestJetStreamAutoMaxAckPending(t *testing.T) { |
| 4685 | s := RunBasicJetStreamServer() |
| 4686 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 4687 | |
| 4688 | nc, js := jsClient(t, s, nats.SyncQueueLen(500)) |
| 4689 | defer nc.Close() |
| 4690 | |
| 4691 | var err error |
| 4692 | |
| 4693 | _, err = js.AddStream(&nats.StreamConfig{Name: "foo"}) |
| 4694 | if err != nil { |
| 4695 | t.Fatalf("Unexpected error: %v", err) |
| 4696 | } |
| 4697 | |
| 4698 | toSend := 10_000 |
| 4699 | |
| 4700 | msg := []byte("Hello") |
| 4701 | for i := 0; i < toSend; i++ { |
| 4702 | // Use plain NATS here for speed. |
| 4703 | nc.Publish("foo", msg) |
| 4704 | } |
| 4705 | nc.Flush() |
| 4706 | |
| 4707 | // Create a consumer. |
| 4708 | sub, err := js.SubscribeSync("foo") |
| 4709 | if err != nil { |
| 4710 | t.Fatalf("Unexpected error: %v", err) |
| 4711 | } |
| 4712 | defer sub.Unsubscribe() |
| 4713 | expectedMaxAck, _, _ := sub.PendingLimits() |
| 4714 | |
| 4715 | ci, err := sub.ConsumerInfo() |
| 4716 | if err != nil { |
| 4717 | t.Fatalf("Unexpected error: %v", err) |
| 4718 | } |
| 4719 | if ci.Config.MaxAckPending != expectedMaxAck { |
| 4720 | t.Fatalf("Expected MaxAckPending to be set to %d, got %d", expectedMaxAck, ci.Config.MaxAckPending) |
| 4721 | } |
| 4722 | |
| 4723 | waitForPending := func(n int) { |
| 4724 | timeout := time.Now().Add(2 * time.Second) |
| 4725 | for time.Now().Before(timeout) { |
| 4726 | if msgs, _, _ := sub.Pending(); msgs == n { |
| 4727 | return |
| 4728 | } |
| 4729 | time.Sleep(10 * time.Millisecond) |
| 4730 | } |
| 4731 | msgs, _, _ := sub.Pending() |
| 4732 | t.Fatalf("Expected to receive %d messages, but got %d", n, msgs) |
| 4733 | } |
| 4734 | |
| 4735 | waitForPending(expectedMaxAck) |
| 4736 | // We do it twice to make sure it does not go over. |
| 4737 | waitForPending(expectedMaxAck) |
| 4738 | |
| 4739 | // Now make sure we can consume them all with no slow consumers etc. |
| 4740 | for i := 0; i < toSend; i++ { |
| 4741 | m, err := sub.NextMsg(time.Second) |
nothing calls this directly
no test coverage detected