(t *testing.T)
| 3317 | } |
| 3318 | |
| 3319 | func TestPullConsumerConsume_WithCluster(t *testing.T) { |
| 3320 | testSubject := "FOO.123" |
| 3321 | testMsgs := []string{"m1", "m2", "m3", "m4", "m5"} |
| 3322 | publishTestMsgs := func(t *testing.T, js jetstream.JetStream) { |
| 3323 | for _, msg := range testMsgs { |
| 3324 | if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil { |
| 3325 | t.Fatalf("Unexpected error during publish: %s", err) |
| 3326 | } |
| 3327 | } |
| 3328 | } |
| 3329 | |
| 3330 | name := "cluster" |
| 3331 | singleStream := jetstream.StreamConfig{ |
| 3332 | Name: name, |
| 3333 | Replicas: 1, |
| 3334 | Subjects: []string{"FOO.*"}, |
| 3335 | } |
| 3336 | |
| 3337 | streamWithReplicas := jetstream.StreamConfig{ |
| 3338 | Name: name, |
| 3339 | Replicas: 3, |
| 3340 | Subjects: []string{"FOO.*"}, |
| 3341 | } |
| 3342 | |
| 3343 | for _, stream := range []jetstream.StreamConfig{singleStream, streamWithReplicas} { |
| 3344 | t.Run(fmt.Sprintf("num replicas: %d, no options", stream.Replicas), func(t *testing.T) { |
| 3345 | withJSClusterAndStream(t, name, 3, stream, func(t *testing.T, subject string, srvs ...*jsServer) { |
| 3346 | nc, err := nats.Connect(srvs[0].ClientURL()) |
| 3347 | if err != nil { |
| 3348 | t.Fatalf("Unexpected error: %v", err) |
| 3349 | } |
| 3350 | |
| 3351 | js, err := jetstream.New(nc) |
| 3352 | if err != nil { |
| 3353 | t.Fatalf("Unexpected error: %v", err) |
| 3354 | } |
| 3355 | defer nc.Close() |
| 3356 | |
| 3357 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 3358 | defer cancel() |
| 3359 | s, err := js.Stream(ctx, stream.Name) |
| 3360 | if err != nil { |
| 3361 | t.Fatalf("Unexpected error: %v", err) |
| 3362 | } |
| 3363 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) |
| 3364 | if err != nil { |
| 3365 | t.Fatalf("Unexpected error: %v", err) |
| 3366 | } |
| 3367 | |
| 3368 | msgs := make([]jetstream.Msg, 0) |
| 3369 | wg := &sync.WaitGroup{} |
| 3370 | wg.Add(len(testMsgs)) |
| 3371 | l, err := c.Consume(func(msg jetstream.Msg) { |
| 3372 | msgs = append(msgs, msg) |
| 3373 | wg.Done() |
| 3374 | }) |
| 3375 | if err != nil { |
| 3376 | t.Fatalf("Unexpected error: %v", err) |
nothing calls this directly
no test coverage detected