MCPcopy
hub / github.com/nats-io/nats.go / TestConsumerPinned

Function TestConsumerPinned

jetstream/test/consumer_test.go:552–995  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

550}
551
552func TestConsumerPinned(t *testing.T) {
553 t.Run("messages", func(t *testing.T) {
554 srv := RunBasicJetStreamServer()
555 defer shutdownJSServerAndRemoveStorage(t, srv)
556
557 nc, err := nats.Connect(srv.ClientURL())
558 if err != nil {
559 t.Fatalf("Unexpected error: %v", err)
560 }
561 defer nc.Close()
562
563 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
564 defer cancel()
565
566 js, err := jetstream.New(nc)
567 if err != nil {
568 t.Fatalf("Unexpected error: %v", err)
569 }
570
571 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
572 if err != nil {
573 t.Fatalf("Unexpected error: %v", err)
574 }
575 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
576 Durable: "cons",
577 AckPolicy: jetstream.AckExplicitPolicy,
578 Description: "test consumer",
579 PriorityPolicy: jetstream.PriorityPolicyPinned,
580 PinnedTTL: time.Second,
581 PriorityGroups: []string{"A"},
582 })
583 if err != nil {
584 t.Fatalf("Unexpected error: %v", err)
585 }
586
587 for range 1000 {
588 _, err = js.Publish(ctx, "FOO.bar", []byte("hello"))
589 if err != nil {
590 t.Fatalf("Unexpected error: %v", err)
591 }
592 }
593
594 gcount := make(chan struct{}, 1000)
595 count := atomic.Uint32{}
596
597 // Initially pinned consumer instance
598 initiallyPinned, err := s.Consumer(ctx, "cons")
599 if err != nil {
600 t.Fatalf("Unexpected error: %v", err)
601 }
602
603 handler := func(it jetstream.MessagesContext, counter *atomic.Uint32, doneCh chan struct{}) {
604 for {
605 msg, err := it.Next()
606 if err != nil {
607 break
608 }
609 if err := msg.Ack(); err != nil {

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullPriorityGroupTypeAlias · 0.92
PullHeartbeatTypeAlias · 0.92
PullThresholdMessagesTypeAlias · 0.92
FetchPriorityGroupFunction · 0.92
FetchMaxWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
StoreMethod · 0.80
LoadMethod · 0.80
RunBasicJetStreamServerFunction · 0.70

Tested by

no test coverage detected