MCPcopy
hub / github.com/nats-io/nats.go / TestConsumerOverflow

Function TestConsumerOverflow

jetstream/test/consumer_test.go:154–550  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

152}
153
154func TestConsumerOverflow(t *testing.T) {
155 t.Run("fetch", func(t *testing.T) {
156 srv := RunBasicJetStreamServer()
157 defer shutdownJSServerAndRemoveStorage(t, srv)
158
159 nc, err := nats.Connect(srv.ClientURL())
160 if err != nil {
161 t.Fatalf("Unexpected error: %v", err)
162 }
163 defer nc.Close()
164
165 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
166 defer cancel()
167
168 js, err := jetstream.New(nc)
169 if err != nil {
170 t.Fatalf("Unexpected error: %v", err)
171 }
172
173 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
174 if err != nil {
175 t.Fatalf("Unexpected error: %v", err)
176 }
177
178 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
179 PriorityPolicy: jetstream.PriorityPolicyOverflow,
180 PriorityGroups: []string{"A"},
181 })
182 if err != nil {
183 t.Fatalf("Unexpected error: %v", err)
184 }
185
186 // Check that consumer got proper priority policy and TTL
187 info := c.CachedInfo()
188 if info.Config.PriorityPolicy != jetstream.PriorityPolicyOverflow {
189 t.Fatalf("Invalid priority policy; expected: %v; got: %v", jetstream.PriorityPolicyOverflow, info.Config.PriorityPolicy)
190 }
191
192 for range 100 {
193 _, err = js.Publish(ctx, "FOO.bar", []byte("hello"))
194 if err != nil {
195 t.Fatalf("Unexpected error: %v", err)
196 }
197 }
198
199 // We are below overflow, so we should not get any messages.
200 msgs, err := c.Fetch(10, jetstream.FetchMinPending(110), jetstream.FetchMaxWait(500*time.Millisecond), jetstream.FetchPriorityGroup("A"))
201 if err != nil {
202 t.Fatalf("Unexpected error: %v", err)
203 }
204 count := 0
205 for msg := range msgs.Messages() {
206 msg.Ack()
207 count++
208 }
209 if count != 0 {
210 t.Fatalf("Expected 0 messages, got %d", count)
211 }

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
FetchMinPendingFunction · 0.92
FetchMaxWaitFunction · 0.92
FetchPriorityGroupFunction · 0.92
FetchMinAckPendingFunction · 0.92
PullPriorityGroupTypeAlias · 0.92
PullMinPendingTypeAlias · 0.92
PullMaxMessagesTypeAlias · 0.92
PullExpiryTypeAlias · 0.92
PullMinAckPendingTypeAlias · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80

Tested by

no test coverage detected