(t *testing.T)
| 2198 | } |
| 2199 | |
| 2200 | func TestPullConsumerConsume(t *testing.T) { |
| 2201 | testSubject := "FOO.123" |
| 2202 | testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} |
| 2203 | publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { |
| 2204 | for _, msg := range testMsgs { |
| 2205 | if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { |
| 2206 | t.Fatalf("Unexpected error during publish: %s", err) |
| 2207 | } |
| 2208 | } |
| 2209 | } |
| 2210 | |
| 2211 | t.Run("no options", func(t *testing.T) { |
| 2212 | srv := RunBasicJetStreamServer() |
| 2213 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 2214 | nc, err := nats.Connect(srv.ClientURL()) |
| 2215 | if err != nil { |
| 2216 | t.Fatalf("Unexpected error: %v", err) |
| 2217 | } |
| 2218 | |
| 2219 | js, err := jetstream.New(nc) |
| 2220 | if err != nil { |
| 2221 | t.Fatalf("Unexpected error: %v", err) |
| 2222 | } |
| 2223 | defer nc.Close() |
| 2224 | |
| 2225 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
| 2226 | defer cancel() |
| 2227 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 2228 | if err != nil { |
| 2229 | t.Fatalf("Unexpected error: %v", err) |
| 2230 | } |
| 2231 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) |
| 2232 | if err != nil { |
| 2233 | t.Fatalf("Unexpected error: %v", err) |
| 2234 | } |
| 2235 | |
| 2236 | msgs := make([]jetstream.Msg, 0) |
| 2237 | wg := &sync.WaitGroup{} |
| 2238 | wg.Add(len(testMsgs)) |
| 2239 | l, err := c.Consume(func(msg jetstream.Msg) { |
| 2240 | msgs = append(msgs, msg) |
| 2241 | wg.Done() |
| 2242 | }) |
| 2243 | if err != nil { |
| 2244 | t.Fatalf("Unexpected error: %v", err) |
| 2245 | } |
| 2246 | defer l.Stop() |
| 2247 | |
| 2248 | publishTestMsgs(t, js) |
| 2249 | wg.Wait() |
| 2250 | if len(msgs) != len(testMsgs) { |
| 2251 | t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs)) |
| 2252 | } |
| 2253 | for i, msg := range msgs { |
| 2254 | if string(msg.Data()) != testMsgs[i] { |
| 2255 | t.Fatalf("Invalid msg on index %d; expected: %s; got: %s", i, testMsgs[i], string(msg.Data())) |
| 2256 | } |
| 2257 | } |
nothing calls this directly
no test coverage detected