MCPcopy
hub / github.com/nats-io/nats.go / TestPullConsumerMessages

Function TestPullConsumerMessages

jetstream/test/pull_test.go:1105–2198  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1103}
1104
1105func TestPullConsumerMessages(t *testing.T) {
1106 testSubject := "FOO.123"
1107 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
1108 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
1109 for _, msg := range testMsgs {
1110 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
1111 t.Fatalf("Unexpected error during publish: %s", err)
1112 }
1113 }
1114 }
1115
1116 t.Run("no options", func(t *testing.T) {
1117 srv := RunBasicJetStreamServer()
1118 defer shutdownJSServerAndRemoveStorage(t, srv)
1119 nc, err := nats.Connect(srv.ClientURL())
1120 if err != nil {
1121 t.Fatalf("Unexpected error: %v", err)
1122 }
1123
1124 js, err := jetstream.New(nc)
1125 if err != nil {
1126 t.Fatalf("Unexpected error: %v", err)
1127 }
1128 defer nc.Close()
1129
1130 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
1131 defer cancel()
1132 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1133 if err != nil {
1134 t.Fatalf("Unexpected error: %v", err)
1135 }
1136 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
1137 if err != nil {
1138 t.Fatalf("Unexpected error: %v", err)
1139 }
1140
1141 msgs := make([]jetstream.Msg, 0)
1142 it, err := c.Messages()
1143 if err != nil {
1144 t.Fatalf("Unexpected error: %v", err)
1145 }
1146
1147 publishTestMsgs(t, js)
1148 for i := 0; i < len(testMsgs); i++ {
1149 msg, err := it.Next()
1150 if err != nil {
1151 t.Fatal(err)
1152 }
1153 if msg == nil {
1154 break
1155 }
1156 msg.Ack()
1157 msgs = append(msgs, msg)
1158
1159 }
1160 it.Stop()
1161
1162 // calling Stop() multiple times should have no effect

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullMaxMessagesTypeAlias · 0.92
PullMaxBytesTypeAlias · 0.92
StopAfterTypeAlias · 0.92
PullHeartbeatTypeAlias · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
PendingMethod · 0.80
UnsubscribeMethod · 0.80
NextMsgMethod · 0.80
RunBasicJetStreamServerFunction · 0.70

Tested by

no test coverage detected