MCPcopy
hub / github.com/nats-io/nats.go / TestConsumerUnpin

Function TestConsumerUnpin

jetstream/test/consumer_test.go:997–1159  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

995}
996
997func TestConsumerUnpin(t *testing.T) {
998 t.Run("unpin consumer", func(t *testing.T) {
999 srv := RunBasicJetStreamServer()
1000 defer shutdownJSServerAndRemoveStorage(t, srv)
1001
1002 nc, err := nats.Connect(srv.ClientURL())
1003 if err != nil {
1004 t.Fatalf("Unexpected error: %v", err)
1005 }
1006 defer nc.Close()
1007
1008 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1009 defer cancel()
1010
1011 js, err := jetstream.New(nc)
1012 if err != nil {
1013 t.Fatalf("Unexpected error: %v", err)
1014 }
1015
1016 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1017 if err != nil {
1018 t.Fatalf("Unexpected error: %v", err)
1019 }
1020 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
1021 Durable: "cons",
1022 AckPolicy: jetstream.AckExplicitPolicy,
1023 Description: "test consumer",
1024 PriorityPolicy: jetstream.PriorityPolicyPinned,
1025 PinnedTTL: 50 * time.Second,
1026 PriorityGroups: []string{"A"},
1027 })
1028 if err != nil {
1029 t.Fatalf("Unexpected error: %v", err)
1030 }
1031
1032 for range 1000 {
1033 _, err = js.Publish(ctx, "FOO.bar", []byte("hello"))
1034 if err != nil {
1035 t.Fatalf("Unexpected error: %v", err)
1036 }
1037 }
1038
1039 msgs, err := c.Messages(jetstream.PullPriorityGroup("A"))
1040 if err != nil {
1041 t.Fatalf("Unexpected error: %v", err)
1042 }
1043 defer msgs.Stop()
1044
1045 msg, err := msgs.Next()
1046 if err != nil {
1047 t.Fatalf("Unexpected error: %v", err)
1048 }
1049
1050 firstPinID := msg.Headers().Get("Nats-Pin-Id")
1051 if firstPinID == "" {
1052 t.Fatalf("Expected pinned message")
1053 }
1054

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullPriorityGroupTypeAlias · 0.92
PullExpiryTypeAlias · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
CreateStreamMethod · 0.65
PublishMethod · 0.65
MessagesMethod · 0.65

Tested by

no test coverage detected