(t *testing.T)
| 5266 | } |
| 5267 | |
| 5268 | func TestJetStreamPullSubscribe_AckPending(t *testing.T) { |
| 5269 | s := RunBasicJetStreamServer() |
| 5270 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 5271 | |
| 5272 | nc, js := jsClient(t, s) |
| 5273 | defer nc.Close() |
| 5274 | |
| 5275 | var err error |
| 5276 | |
| 5277 | // Create the stream using our client API. |
| 5278 | _, err = js.AddStream(&nats.StreamConfig{ |
| 5279 | Name: "TEST", |
| 5280 | Subjects: []string{"foo"}, |
| 5281 | }) |
| 5282 | if err != nil { |
| 5283 | t.Fatalf("Unexpected error: %v", err) |
| 5284 | } |
| 5285 | |
| 5286 | const totalMsgs = 10 |
| 5287 | for i := 0; i < totalMsgs; i++ { |
| 5288 | payload := fmt.Sprintf("i:%d", i) |
| 5289 | js.Publish("foo", []byte(payload)) |
| 5290 | } |
| 5291 | |
| 5292 | sub, err := js.PullSubscribe("foo", "wq", |
| 5293 | nats.AckWait(200*time.Millisecond), |
| 5294 | nats.MaxAckPending(5), |
| 5295 | ) |
| 5296 | if err != nil { |
| 5297 | t.Fatalf("Unexpected error: %v", err) |
| 5298 | } |
| 5299 | |
| 5300 | nextMsg := func() *nats.Msg { |
| 5301 | t.Helper() |
| 5302 | msgs, err := sub.Fetch(1) |
| 5303 | if err != nil { |
| 5304 | t.Fatal(err) |
| 5305 | } |
| 5306 | return msgs[0] |
| 5307 | } |
| 5308 | |
| 5309 | getPending := func() (int, int) { |
| 5310 | t.Helper() |
| 5311 | info, err := sub.ConsumerInfo() |
| 5312 | if err != nil { |
| 5313 | t.Fatal(err) |
| 5314 | } |
| 5315 | return info.NumAckPending, int(info.NumPending) |
| 5316 | } |
| 5317 | |
| 5318 | getMetadata := func(msg *nats.Msg) *nats.MsgMetadata { |
| 5319 | t.Helper() |
| 5320 | meta, err := msg.Metadata() |
| 5321 | if err != nil { |
| 5322 | t.Fatalf("Unexpected error: %v", err) |
| 5323 | } |
| 5324 | return meta |
| 5325 | } |
nothing calls this directly
no test coverage detected