(t *testing.T)
| 10808 | } |
| 10809 | |
| 10810 | func TestJetStreamSyncSubscribeWithMaxAckPending(t *testing.T) { |
| 10811 | opts := natsserver.DefaultTestOptions |
| 10812 | opts.Port = -1 |
| 10813 | opts.JetStream = true |
| 10814 | opts.JetStreamLimits.MaxAckPending = 123 |
| 10815 | s := RunServerWithOptions(&opts) |
| 10816 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 10817 | |
| 10818 | nc, js := jsClient(t, s) |
| 10819 | defer nc.Close() |
| 10820 | |
| 10821 | if _, err := js.AddStream(&nats.StreamConfig{Name: "MAX_ACK_PENDING", Subjects: []string{"foo"}}); err != nil { |
| 10822 | t.Fatalf("Error adding stream: %v", err) |
| 10823 | } |
| 10824 | |
| 10825 | // By default, the sync subscription will be created with a MaxAckPending equal |
| 10826 | // to the internal sync queue len, which is 64K. So that should error out |
| 10827 | // and make sure we get the actual limit |
| 10828 | |
| 10829 | checkSub := func(pull bool) { |
| 10830 | var sub *nats.Subscription |
| 10831 | var err error |
| 10832 | if pull { |
| 10833 | _, err = js.PullSubscribe("foo", "bar") |
| 10834 | } else { |
| 10835 | _, err = js.SubscribeSync("foo") |
| 10836 | } |
| 10837 | if err == nil || !strings.Contains(err.Error(), "system limit of 123") { |
| 10838 | t.Fatalf("Unexpected error: %v", err) |
| 10839 | } |
| 10840 | |
| 10841 | // But it should work if we use MaxAckPending() with lower value |
| 10842 | if pull { |
| 10843 | sub, err = js.PullSubscribe("foo", "bar", nats.MaxAckPending(64)) |
| 10844 | } else { |
| 10845 | sub, err = js.SubscribeSync("foo", nats.MaxAckPending(64)) |
| 10846 | } |
| 10847 | if err != nil { |
| 10848 | t.Fatalf("Unexpected error: %v", err) |
| 10849 | } |
| 10850 | sub.Unsubscribe() |
| 10851 | } |
| 10852 | checkSub(false) |
| 10853 | checkSub(true) |
| 10854 | } |
| 10855 | |
| 10856 | func TestJetStreamClusterPlacement(t *testing.T) { |
| 10857 | // There used to be a test here that would not work because it would require |
nothing calls this directly
no test coverage detected