(t *testing.T)
| 1587 | } |
| 1588 | |
| 1589 | func TestPullSubscribeFetchBatch(t *testing.T) { |
| 1590 | s := RunBasicJetStreamServer() |
| 1591 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 1592 | |
| 1593 | nc, js := jsClient(t, s) |
| 1594 | defer nc.Close() |
| 1595 | |
| 1596 | _, err := js.AddStream(&nats.StreamConfig{ |
| 1597 | Name: "TEST", |
| 1598 | Subjects: []string{"foo"}, |
| 1599 | }) |
| 1600 | if err != nil { |
| 1601 | t.Fatalf("Unexpected error: %v", err) |
| 1602 | } |
| 1603 | |
| 1604 | t.Run("basic fetch", func(t *testing.T) { |
| 1605 | defer js.PurgeStream("TEST") |
| 1606 | sub, err := js.PullSubscribe("foo", "") |
| 1607 | if err != nil { |
| 1608 | t.Fatalf("Unexpected error: %s", err) |
| 1609 | } |
| 1610 | for i := 0; i < 5; i++ { |
| 1611 | if _, err := js.Publish("foo", []byte("msg")); err != nil { |
| 1612 | t.Fatalf("Unexpected error: %s", err) |
| 1613 | } |
| 1614 | } |
| 1615 | res, err := sub.FetchBatch(10) |
| 1616 | if err != nil { |
| 1617 | t.Fatalf("Unexpected error: %s", err) |
| 1618 | } |
| 1619 | go func() { |
| 1620 | time.Sleep(10 * time.Millisecond) |
| 1621 | for i := 0; i < 5; i++ { |
| 1622 | js.Publish("foo", []byte("msg")) |
| 1623 | } |
| 1624 | }() |
| 1625 | msgs := make([]*nats.Msg, 0) |
| 1626 | for msg := range res.Messages() { |
| 1627 | msgs = append(msgs, msg) |
| 1628 | } |
| 1629 | if res.Error() != nil { |
| 1630 | t.Fatalf("Unexpected error: %s", res.Error()) |
| 1631 | } |
| 1632 | if len(msgs) != 10 { |
| 1633 | t.Fatalf("Expected %d messages; got: %d", 10, len(msgs)) |
| 1634 | } |
| 1635 | }) |
| 1636 | |
| 1637 | t.Run("multiple concurrent fetches", func(t *testing.T) { |
| 1638 | defer js.PurgeStream("TEST") |
| 1639 | sub, err := js.PullSubscribe("foo", "") |
| 1640 | if err != nil { |
| 1641 | t.Fatalf("Unexpected error: %s", err) |
| 1642 | } |
| 1643 | for i := 0; i < 50; i++ { |
| 1644 | if _, err := js.Publish("foo", []byte("msg")); err != nil { |
| 1645 | t.Fatalf("Unexpected error: %s", err) |
| 1646 | } |
nothing calls this directly
no test coverage detected