MCPcopy
hub / github.com/nats-io/nats.go / TestPullSubscribeFetchDrain

Function TestPullSubscribeFetchDrain

test/js_test.go:1390–1446  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1388}
1389
1390func TestPullSubscribeFetchDrain(t *testing.T) {
1391 s := RunBasicJetStreamServer()
1392 defer shutdownJSServerAndRemoveStorage(t, s)
1393
1394 nc, js := jsClient(t, s)
1395 defer nc.Close()
1396
1397 _, err := js.AddStream(&nats.StreamConfig{
1398 Name: "TEST",
1399 Subjects: []string{"foo"},
1400 })
1401 if err != nil {
1402 t.Fatalf("Unexpected error: %v", err)
1403 }
1404
1405 defer js.PurgeStream("TEST")
1406 sub, err := js.PullSubscribe("foo", "")
1407 if err != nil {
1408 t.Fatalf("Unexpected error: %s", err)
1409 }
1410 for i := 0; i < 100; i++ {
1411 if _, err := js.Publish("foo", []byte("msg")); err != nil {
1412 t.Fatalf("Unexpected error: %s", err)
1413 }
1414 }
1415 // fill buffer with messages
1416 cinfo, err := sub.ConsumerInfo()
1417 if err != nil {
1418 t.Fatalf("Unexpected error: %s", err)
1419 }
1420 nextSubject := fmt.Sprintf("$JS.API.CONSUMER.MSG.NEXT.TEST.%s", cinfo.Name)
1421 replySubject := strings.Replace(sub.Subject, "*", "abc", 1)
1422 payload := `{"batch":10,"no_wait":true}`
1423 if err := nc.PublishRequest(nextSubject, replySubject, []byte(payload)); err != nil {
1424 t.Fatalf("Unexpected error: %s", err)
1425 }
1426 time.Sleep(100 * time.Millisecond)
1427
1428 // now drain the subscription, messages should be in the buffer
1429 sub.Drain()
1430 msgs, err := sub.Fetch(100)
1431 if err != nil {
1432 t.Fatalf("Unexpected error: %s", err)
1433 }
1434 for _, msg := range msgs {
1435 msg.Ack()
1436 }
1437 if len(msgs) != 10 {
1438 t.Fatalf("Expected %d messages; got: %d", 10, len(msgs))
1439 }
1440
1441 // subsequent fetch should return error, subscription is already drained
1442 _, err = sub.Fetch(10, nats.MaxWait(100*time.Millisecond))
1443 if !errors.Is(err, nats.ErrSubscriptionClosed) {
1444 t.Fatalf("Expected error: %s; got: %s", nats.ErrSubscriptionClosed, err)
1445 }
1446}
1447

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PurgeStreamMethod · 0.65
PullSubscribeMethod · 0.65
PublishMethod · 0.65
ConsumerInfoMethod · 0.65
DrainMethod · 0.65
FetchMethod · 0.65
AckMethod · 0.65

Tested by

no test coverage detected