MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamPullSubscribe_AckPending

Function TestJetStreamPullSubscribe_AckPending

test/js_test.go:5268–5486  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

5266}
5267
5268func TestJetStreamPullSubscribe_AckPending(t *testing.T) {
5269 s := RunBasicJetStreamServer()
5270 defer shutdownJSServerAndRemoveStorage(t, s)
5271
5272 nc, js := jsClient(t, s)
5273 defer nc.Close()
5274
5275 var err error
5276
5277 // Create the stream using our client API.
5278 _, err = js.AddStream(&nats.StreamConfig{
5279 Name: "TEST",
5280 Subjects: []string{"foo"},
5281 })
5282 if err != nil {
5283 t.Fatalf("Unexpected error: %v", err)
5284 }
5285
5286 const totalMsgs = 10
5287 for i := 0; i < totalMsgs; i++ {
5288 payload := fmt.Sprintf("i:%d", i)
5289 js.Publish("foo", []byte(payload))
5290 }
5291
5292 sub, err := js.PullSubscribe("foo", "wq",
5293 nats.AckWait(200*time.Millisecond),
5294 nats.MaxAckPending(5),
5295 )
5296 if err != nil {
5297 t.Fatalf("Unexpected error: %v", err)
5298 }
5299
5300 nextMsg := func() *nats.Msg {
5301 t.Helper()
5302 msgs, err := sub.Fetch(1)
5303 if err != nil {
5304 t.Fatal(err)
5305 }
5306 return msgs[0]
5307 }
5308
5309 getPending := func() (int, int) {
5310 t.Helper()
5311 info, err := sub.ConsumerInfo()
5312 if err != nil {
5313 t.Fatal(err)
5314 }
5315 return info.NumAckPending, int(info.NumPending)
5316 }
5317
5318 getMetadata := func(msg *nats.Msg) *nats.MsgMetadata {
5319 t.Helper()
5320 meta, err := msg.Metadata()
5321 if err != nil {
5322 t.Fatalf("Unexpected error: %v", err)
5323 }
5324 return meta
5325 }

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
ErrorfMethod · 0.80
AckSyncMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
checkForFunction · 0.70
AddStreamMethod · 0.65
PublishMethod · 0.65
PullSubscribeMethod · 0.65
FetchMethod · 0.65
ConsumerInfoMethod · 0.65

Tested by

no test coverage detected