MCPcopy
hub / github.com/nats-io/nats.go / TestPullConsumerConsume_WithCluster

Function TestPullConsumerConsume_WithCluster

jetstream/test/pull_test.go:3319–3536  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

3317}
3318
3319func TestPullConsumerConsume_WithCluster(t *testing.T) {
3320 testSubject := "FOO.123"
3321 testMsgs := []string{"m1", "m2", "m3", "m4", "m5"}
3322 publishTestMsgs := func(t *testing.T, js jetstream.JetStream) {
3323 for _, msg := range testMsgs {
3324 if _, err := js.Publish(context.Background(), testSubject, []byte(msg)); err != nil {
3325 t.Fatalf("Unexpected error during publish: %s", err)
3326 }
3327 }
3328 }
3329
3330 name := "cluster"
3331 singleStream := jetstream.StreamConfig{
3332 Name: name,
3333 Replicas: 1,
3334 Subjects: []string{"FOO.*"},
3335 }
3336
3337 streamWithReplicas := jetstream.StreamConfig{
3338 Name: name,
3339 Replicas: 3,
3340 Subjects: []string{"FOO.*"},
3341 }
3342
3343 for _, stream := range []jetstream.StreamConfig{singleStream, streamWithReplicas} {
3344 t.Run(fmt.Sprintf("num replicas: %d, no options", stream.Replicas), func(t *testing.T) {
3345 withJSClusterAndStream(t, name, 3, stream, func(t *testing.T, subject string, srvs ...*jsServer) {
3346 nc, err := nats.Connect(srvs[0].ClientURL())
3347 if err != nil {
3348 t.Fatalf("Unexpected error: %v", err)
3349 }
3350
3351 js, err := jetstream.New(nc)
3352 if err != nil {
3353 t.Fatalf("Unexpected error: %v", err)
3354 }
3355 defer nc.Close()
3356
3357 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
3358 defer cancel()
3359 s, err := js.Stream(ctx, stream.Name)
3360 if err != nil {
3361 t.Fatalf("Unexpected error: %v", err)
3362 }
3363 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
3364 if err != nil {
3365 t.Fatalf("Unexpected error: %v", err)
3366 }
3367
3368 msgs := make([]jetstream.Msg, 0)
3369 wg := &sync.WaitGroup{}
3370 wg.Add(len(testMsgs))
3371 l, err := c.Consume(func(msg jetstream.Msg) {
3372 msgs = append(msgs, msg)
3373 wg.Done()
3374 })
3375 if err != nil {
3376 t.Fatalf("Unexpected error: %v", err)

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullExpiryTypeAlias · 0.92
PullHeartbeatTypeAlias · 0.92
FatalfMethod · 0.80
ConnectMethod · 0.80
withJSClusterAndStreamFunction · 0.70
PublishMethod · 0.65
StreamMethod · 0.65
AddMethod · 0.65
ConsumeMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected