MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamAutoMaxAckPending

Function TestJetStreamAutoMaxAckPending

test/js_test.go:4684–4747  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

4682}
4683
4684func TestJetStreamAutoMaxAckPending(t *testing.T) {
4685 s := RunBasicJetStreamServer()
4686 defer shutdownJSServerAndRemoveStorage(t, s)
4687
4688 nc, js := jsClient(t, s, nats.SyncQueueLen(500))
4689 defer nc.Close()
4690
4691 var err error
4692
4693 _, err = js.AddStream(&nats.StreamConfig{Name: "foo"})
4694 if err != nil {
4695 t.Fatalf("Unexpected error: %v", err)
4696 }
4697
4698 toSend := 10_000
4699
4700 msg := []byte("Hello")
4701 for i := 0; i < toSend; i++ {
4702 // Use plain NATS here for speed.
4703 nc.Publish("foo", msg)
4704 }
4705 nc.Flush()
4706
4707 // Create a consumer.
4708 sub, err := js.SubscribeSync("foo")
4709 if err != nil {
4710 t.Fatalf("Unexpected error: %v", err)
4711 }
4712 defer sub.Unsubscribe()
4713 expectedMaxAck, _, _ := sub.PendingLimits()
4714
4715 ci, err := sub.ConsumerInfo()
4716 if err != nil {
4717 t.Fatalf("Unexpected error: %v", err)
4718 }
4719 if ci.Config.MaxAckPending != expectedMaxAck {
4720 t.Fatalf("Expected MaxAckPending to be set to %d, got %d", expectedMaxAck, ci.Config.MaxAckPending)
4721 }
4722
4723 waitForPending := func(n int) {
4724 timeout := time.Now().Add(2 * time.Second)
4725 for time.Now().Before(timeout) {
4726 if msgs, _, _ := sub.Pending(); msgs == n {
4727 return
4728 }
4729 time.Sleep(10 * time.Millisecond)
4730 }
4731 msgs, _, _ := sub.Pending()
4732 t.Fatalf("Expected to receive %d messages, but got %d", n, msgs)
4733 }
4734
4735 waitForPending(expectedMaxAck)
4736 // We do it twice to make sure it does not go over.
4737 waitForPending(expectedMaxAck)
4738
4739 // Now make sure we can consume them all with no slow consumers etc.
4740 for i := 0; i < toSend; i++ {
4741 m, err := sub.NextMsg(time.Second)

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
UnsubscribeMethod · 0.80
PendingLimitsMethod · 0.80
PendingMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
SubscribeSyncMethod · 0.65
ConsumerInfoMethod · 0.65

Tested by

no test coverage detected