MCPcopy
hub / github.com/nats-io/nats.go / TestOrderedConsumerMessages

Function TestOrderedConsumerMessages

jetstream/test/ordered_test.go:703–1170  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

701}
702
703func TestOrderedConsumerMessages(t *testing.T) {
704 testSubject := "FOO.123"
705 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
706 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
707 for _, msg := range testMsgs {
708 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
709 t.Fatalf("Unexpected error during publish: %s", err)
710 }
711 }
712 }
713 t.Run("base usage, delete consumer", func(t *testing.T) {
714 srv := RunBasicJetStreamServer()
715 defer shutdownJSServerAndRemoveStorage(t, srv)
716 nc, err := nats.Connect(srv.ClientURL())
717 if err != nil {
718 t.Fatalf("Unexpected error: %v", err)
719 }
720
721 js, err := jetstream.New(nc)
722 if err != nil {
723 t.Fatalf("Unexpected error: %v", err)
724 }
725 defer nc.Close()
726
727 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
728 defer cancel()
729 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
730 if err != nil {
731 t.Fatalf("Unexpected error: %v", err)
732 }
733 c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
734 if err != nil {
735 t.Fatalf("Unexpected error: %v", err)
736 }
737
738 msgs := make([]jetstream.Msg, 0)
739 it, err := c.Messages()
740 if err != nil {
741 t.Fatalf("Unexpected error: %v", err)
742 }
743 defer it.Stop()
744
745 publishTestMsgs(t, js)
746 for i := 0; i < 5; i++ {
747 msg, err := it.Next()
748 if err != nil {
749 t.Fatalf("Unexpected error: %s", err)
750 }
751 msgs = append(msgs, msg)
752 }
753 name := c.CachedInfo().Name
754 if err := s.DeleteConsumer(ctx, name); err != nil {
755 t.Fatal(err)
756 }
757 publishTestMsgs(t, js)
758 for i := 0; i < 5; i++ {
759 msg, err := it.Next()
760 if err != nil {

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullHeartbeatTypeAlias · 0.92
StopAfterTypeAlias · 0.92
PullMaxMessagesTypeAlias · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
restartBasicJSServerFunction · 0.70
PublishMethod · 0.65
CreateStreamMethod · 0.65
OrderedConsumerMethod · 0.65

Tested by

no test coverage detected