(t *testing.T)
| 550 | } |
| 551 | |
| 552 | func TestConsumerPinned(t *testing.T) { |
| 553 | t.Run("messages", func(t *testing.T) { |
| 554 | srv := RunBasicJetStreamServer() |
| 555 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 556 | |
| 557 | nc, err := nats.Connect(srv.ClientURL()) |
| 558 | if err != nil { |
| 559 | t.Fatalf("Unexpected error: %v", err) |
| 560 | } |
| 561 | defer nc.Close() |
| 562 | |
| 563 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 564 | defer cancel() |
| 565 | |
| 566 | js, err := jetstream.New(nc) |
| 567 | if err != nil { |
| 568 | t.Fatalf("Unexpected error: %v", err) |
| 569 | } |
| 570 | |
| 571 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 572 | if err != nil { |
| 573 | t.Fatalf("Unexpected error: %v", err) |
| 574 | } |
| 575 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ |
| 576 | Durable: "cons", |
| 577 | AckPolicy: jetstream.AckExplicitPolicy, |
| 578 | Description: "test consumer", |
| 579 | PriorityPolicy: jetstream.PriorityPolicyPinned, |
| 580 | PinnedTTL: time.Second, |
| 581 | PriorityGroups: []string{"A"}, |
| 582 | }) |
| 583 | if err != nil { |
| 584 | t.Fatalf("Unexpected error: %v", err) |
| 585 | } |
| 586 | |
| 587 | for range 1000 { |
| 588 | _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) |
| 589 | if err != nil { |
| 590 | t.Fatalf("Unexpected error: %v", err) |
| 591 | } |
| 592 | } |
| 593 | |
| 594 | gcount := make(chan struct{}, 1000) |
| 595 | count := atomic.Uint32{} |
| 596 | |
| 597 | // Initially pinned consumer instance |
| 598 | initiallyPinned, err := s.Consumer(ctx, "cons") |
| 599 | if err != nil { |
| 600 | t.Fatalf("Unexpected error: %v", err) |
| 601 | } |
| 602 | |
| 603 | handler := func(it jetstream.MessagesContext, counter *atomic.Uint32, doneCh chan struct{}) { |
| 604 | for { |
| 605 | msg, err := it.Next() |
| 606 | if err != nil { |
| 607 | break |
| 608 | } |
| 609 | if err := msg.Ack(); err != nil { |
nothing calls this directly
no test coverage detected