(t *testing.T)
| 1103 | } |
| 1104 | |
| 1105 | func TestPullConsumerMessages(t *testing.T) { |
| 1106 | testSubject := "FOO.123" |
| 1107 | testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} |
| 1108 | publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { |
| 1109 | for _, msg := range testMsgs { |
| 1110 | if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { |
| 1111 | t.Fatalf("Unexpected error during publish: %s", err) |
| 1112 | } |
| 1113 | } |
| 1114 | } |
| 1115 | |
| 1116 | t.Run("no options", func(t *testing.T) { |
| 1117 | srv := RunBasicJetStreamServer() |
| 1118 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1119 | nc, err := nats.Connect(srv.ClientURL()) |
| 1120 | if err != nil { |
| 1121 | t.Fatalf("Unexpected error: %v", err) |
| 1122 | } |
| 1123 | |
| 1124 | js, err := jetstream.New(nc) |
| 1125 | if err != nil { |
| 1126 | t.Fatalf("Unexpected error: %v", err) |
| 1127 | } |
| 1128 | defer nc.Close() |
| 1129 | |
| 1130 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
| 1131 | defer cancel() |
| 1132 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1133 | if err != nil { |
| 1134 | t.Fatalf("Unexpected error: %v", err) |
| 1135 | } |
| 1136 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) |
| 1137 | if err != nil { |
| 1138 | t.Fatalf("Unexpected error: %v", err) |
| 1139 | } |
| 1140 | |
| 1141 | msgs := make([]jetstream.Msg, 0) |
| 1142 | it, err := c.Messages() |
| 1143 | if err != nil { |
| 1144 | t.Fatalf("Unexpected error: %v", err) |
| 1145 | } |
| 1146 | |
| 1147 | publishTestMsgs(t, js) |
| 1148 | for i := 0; i < len(testMsgs); i++ { |
| 1149 | msg, err := it.Next() |
| 1150 | if err != nil { |
| 1151 | t.Fatal(err) |
| 1152 | } |
| 1153 | if msg == nil { |
| 1154 | break |
| 1155 | } |
| 1156 | msg.Ack() |
| 1157 | msgs = append(msgs, msg) |
| 1158 | |
| 1159 | } |
| 1160 | it.Stop() |
| 1161 | |
| 1162 | // calling Stop() multiple times should have no effect |
nothing calls this directly
no test coverage detected