MCPcopy
hub / github.com/nats-io/nats.go / TestPullSubscribeFetchBatch

Function TestPullSubscribeFetchBatch

test/js_test.go:1589–2020  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1587}
1588
1589func TestPullSubscribeFetchBatch(t *testing.T) {
1590 s := RunBasicJetStreamServer()
1591 defer shutdownJSServerAndRemoveStorage(t, s)
1592
1593 nc, js := jsClient(t, s)
1594 defer nc.Close()
1595
1596 _, err := js.AddStream(&nats.StreamConfig{
1597 Name: "TEST",
1598 Subjects: []string{"foo"},
1599 })
1600 if err != nil {
1601 t.Fatalf("Unexpected error: %v", err)
1602 }
1603
1604 t.Run("basic fetch", func(t *testing.T) {
1605 defer js.PurgeStream("TEST")
1606 sub, err := js.PullSubscribe("foo", "")
1607 if err != nil {
1608 t.Fatalf("Unexpected error: %s", err)
1609 }
1610 for i := 0; i < 5; i++ {
1611 if _, err := js.Publish("foo", []byte("msg")); err != nil {
1612 t.Fatalf("Unexpected error: %s", err)
1613 }
1614 }
1615 res, err := sub.FetchBatch(10)
1616 if err != nil {
1617 t.Fatalf("Unexpected error: %s", err)
1618 }
1619 go func() {
1620 time.Sleep(10 * time.Millisecond)
1621 for i := 0; i < 5; i++ {
1622 js.Publish("foo", []byte("msg"))
1623 }
1624 }()
1625 msgs := make([]*nats.Msg, 0)
1626 for msg := range res.Messages() {
1627 msgs = append(msgs, msg)
1628 }
1629 if res.Error() != nil {
1630 t.Fatalf("Unexpected error: %s", res.Error())
1631 }
1632 if len(msgs) != 10 {
1633 t.Fatalf("Expected %d messages; got: %d", 10, len(msgs))
1634 }
1635 })
1636
1637 t.Run("multiple concurrent fetches", func(t *testing.T) {
1638 defer js.PurgeStream("TEST")
1639 sub, err := js.PullSubscribe("foo", "")
1640 if err != nil {
1641 t.Fatalf("Unexpected error: %s", err)
1642 }
1643 for i := 0; i < 50; i++ {
1644 if _, err := js.Publish("foo", []byte("msg")); err != nil {
1645 t.Fatalf("Unexpected error: %s", err)
1646 }

Callers

nothing calls this directly

Calls 15

DoneMethod · 0.95
MessagesMethod · 0.95
ErrorMethod · 0.95
FatalfMethod · 0.80
FetchBatchMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
PurgeStreamMethod · 0.65
PullSubscribeMethod · 0.65

Tested by

no test coverage detected