(t *testing.T)
| 152 | } |
| 153 | |
| 154 | func TestConsumerOverflow(t *testing.T) { |
| 155 | t.Run("fetch", func(t *testing.T) { |
| 156 | srv := RunBasicJetStreamServer() |
| 157 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 158 | |
| 159 | nc, err := nats.Connect(srv.ClientURL()) |
| 160 | if err != nil { |
| 161 | t.Fatalf("Unexpected error: %v", err) |
| 162 | } |
| 163 | defer nc.Close() |
| 164 | |
| 165 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 166 | defer cancel() |
| 167 | |
| 168 | js, err := jetstream.New(nc) |
| 169 | if err != nil { |
| 170 | t.Fatalf("Unexpected error: %v", err) |
| 171 | } |
| 172 | |
| 173 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 174 | if err != nil { |
| 175 | t.Fatalf("Unexpected error: %v", err) |
| 176 | } |
| 177 | |
| 178 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ |
| 179 | PriorityPolicy: jetstream.PriorityPolicyOverflow, |
| 180 | PriorityGroups: []string{"A"}, |
| 181 | }) |
| 182 | if err != nil { |
| 183 | t.Fatalf("Unexpected error: %v", err) |
| 184 | } |
| 185 | |
| 186 | // Check that consumer got proper priority policy and TTL |
| 187 | info := c.CachedInfo() |
| 188 | if info.Config.PriorityPolicy != jetstream.PriorityPolicyOverflow { |
| 189 | t.Fatalf("Invalid priority policy; expected: %v; got: %v", jetstream.PriorityPolicyOverflow, info.Config.PriorityPolicy) |
| 190 | } |
| 191 | |
| 192 | for range 100 { |
| 193 | _, err = js.Publish(ctx, "FOO.bar", []byte("hello")) |
| 194 | if err != nil { |
| 195 | t.Fatalf("Unexpected error: %v", err) |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | // We are below overflow, so we should not get any messages. |
| 200 | msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(500*time.Millisecond), jetstream.FetchPriorityGroup("A")) |
| 201 | if err != nil { |
| 202 | t.Fatalf("Unexpected error: %v", err) |
| 203 | } |
| 204 | count := 0 |
| 205 | for msg := range msgs.Messages() { |
| 206 | msg.Ack() |
| 207 | count++ |
| 208 | } |
| 209 | if count != 0 { |
| 210 | t.Fatalf("Expected 0 messages, got %d", count) |
| 211 | } |
nothing calls this directly
no test coverage detected