(t *testing.T)
| 995 | } |
| 996 | |
| 997 | func TestConsumerUnpin(t *testing.T) { |
| 998 | t.Run("unpin consumer", func(t *testing.T) { |
| 999 | srv := RunBasicJetStreamServer() |
| 1000 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1001 | |
| 1002 | nc, err := nats.Connect(srv.ClientURL()) |
| 1003 | if err != nil { |
| 1004 | t.Fatalf("Unexpected error: %v", err) |
| 1005 | } |
| 1006 | defer nc.Close() |
| 1007 | |
| 1008 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1009 | defer cancel() |
| 1010 | |
| 1011 | js, err := jetstream.New(nc) |
| 1012 | if err != nil { |
| 1013 | t.Fatalf("Unexpected error: %v", err) |
| 1014 | } |
| 1015 | |
| 1016 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1017 | if err != nil { |
| 1018 | t.Fatalf("Unexpected error: %v", err) |
| 1019 | } |
| 1020 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ |
| 1021 | Durable: "cons", |
| 1022 | AckPolicy: jetstream.AckExplicitPolicy, |
| 1023 | Description: "test consumer", |
| 1024 | PriorityPolicy: jetstream.PriorityPolicyPinned, |
| 1025 | PinnedTTL: 50 * time.Second, |
| 1026 | PriorityGroups: []string{"A"}, |
| 1027 | }) |
| 1028 | if err != nil { |
| 1029 | t.Fatalf("Unexpected error: %v", err) |
| 1030 | } |
| 1031 | |
| 1032 | for range 1000 { |
| 1033 | _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) |
| 1034 | if err != nil { |
| 1035 | t.Fatalf("Unexpected error: %v", err) |
| 1036 | } |
| 1037 | } |
| 1038 | |
| 1039 | msgs, err := c.Messages(jetstream.PullPriorityGroup("A")) |
| 1040 | if err != nil { |
| 1041 | t.Fatalf("Unexpected error: %v", err) |
| 1042 | } |
| 1043 | defer msgs.Stop() |
| 1044 | |
| 1045 | msg, err := msgs.Next() |
| 1046 | if err != nil { |
| 1047 | t.Fatalf("Unexpected error: %v", err) |
| 1048 | } |
| 1049 | |
| 1050 | firstPinID := msg.Headers().Get("Nats-Pin-Id") |
| 1051 | if firstPinID == "" { |
| 1052 | t.Fatalf("Expected pinned message") |
| 1053 | } |
| 1054 |
nothing calls this directly
no test coverage detected