MCPcopy
hub / github.com/nats-io/nats.go / TestOrderedConsumerFetchNoWait

Function TestOrderedConsumerFetchNoWait

jetstream/test/ordered_test.go:1681–1786  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1679}
1680
1681func TestOrderedConsumerFetchNoWait(t *testing.T) {
1682 testSubject := "FOO.123"
1683 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
1684 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
1685 for _, msg := range testMsgs {
1686 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
1687 t.Fatalf("Unexpected error during publish: %s", err)
1688 }
1689 }
1690 }
1691 t.Run("base usage, delete consumer", func(t *testing.T) {
1692 srv := RunBasicJetStreamServer()
1693 defer shutdownJSServerAndRemoveStorage(t, srv)
1694 nc, err := nats.Connect(srv.ClientURL())
1695 if err != nil {
1696 t.Fatalf("Unexpected error: %v", err)
1697 }
1698
1699 js, err := jetstream.New(nc)
1700 if err != nil {
1701 t.Fatalf("Unexpected error: %v", err)
1702 }
1703 defer nc.Close()
1704
1705 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
1706 defer cancel()
1707 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1708 if err != nil {
1709 t.Fatalf("Unexpected error: %v", err)
1710 }
1711 c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
1712 if err != nil {
1713 t.Fatalf("Unexpected error: %v", err)
1714 }
1715
1716 msgs := make([]jetstream.Msg, 0)
1717
1718 publishTestMsgs(t, js)
1719 res, err := c.FetchNoWait(5)
1720 if err != nil {
1721 t.Fatalf("Unexpected error: %s", err)
1722 }
1723
1724 for msg := range res.Messages() {
1725 msgs = append(msgs, msg)
1726 }
1727 if res.Error() != nil {
1728 t.Fatalf("Unexpected error: %s", err)
1729 }
1730 name := c.CachedInfo().Name
1731 if err := s.DeleteConsumer(ctx, name); err != nil {
1732 t.Fatal(err)
1733 }
1734 publishTestMsgs(t, js)
1735 res, err = c.FetchNoWait(5)
1736 if err != nil {
1737 t.Fatalf("Unexpected error: %s", err)
1738 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
PublishMethod · 0.65
CreateStreamMethod · 0.65
OrderedConsumerMethod · 0.65
FetchNoWaitMethod · 0.65
MessagesMethod · 0.65
ErrorMethod · 0.65
CachedInfoMethod · 0.65

Tested by

no test coverage detected