MCPcopy
hub / github.com/nats-io/nats.go / TestConsumerPrioritized

Function TestConsumerPrioritized

jetstream/test/consumer_test.go:1161–1438  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1159}
1160
1161func TestConsumerPrioritized(t *testing.T) {
1162 t.Run("consume", func(t *testing.T) {
1163 srv := RunBasicJetStreamServer()
1164 defer shutdownJSServerAndRemoveStorage(t, srv)
1165
1166 nc, err := nats.Connect(srv.ClientURL())
1167 if err != nil {
1168 t.Fatalf("Unexpected error: %v", err)
1169 }
1170 defer nc.Close()
1171
1172 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1173 defer cancel()
1174
1175 js, err := jetstream.New(nc)
1176 if err != nil {
1177 t.Fatalf("Unexpected error: %v", err)
1178 }
1179
1180 s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
1181 if err != nil {
1182 t.Fatalf("Unexpected error: %v", err)
1183 }
1184 c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
1185 Durable: "cons",
1186 AckPolicy: jetstream.AckExplicitPolicy,
1187 Description: "test consumer",
1188 PriorityPolicy: jetstream.PriorityPolicyPrioritized,
1189 PriorityGroups: []string{"A"},
1190 })
1191 if err != nil {
1192 t.Fatalf("Unexpected error: %v", err)
1193 }
1194 // Check that consumer got proper priority policy
1195 info := c.CachedInfo()
1196 if info.Config.PriorityPolicy != jetstream.PriorityPolicyPrioritized {
1197 t.Fatalf("Invalid priority policy; expected: %v; got: %v", jetstream.PriorityPolicyPrioritized, info.Config.PriorityPolicy)
1198 }
1199 cons1, err := s.Consumer(ctx, "cons")
1200 if err != nil {
1201 t.Fatalf("Unexpected error: %v", err)
1202 }
1203 cons2, err := s.Consumer(ctx, "cons")
1204 if err != nil {
1205 t.Fatalf("Unexpected error: %v", err)
1206 }
1207
1208 wg := sync.WaitGroup{}
1209 wg.Add(100)
1210 errs := make(chan error, 100)
1211 cc1, err := cons1.Consume(func(m jetstream.Msg) {
1212 if err := m.Ack(); err != nil {
1213 errs <- fmt.Errorf("Failed to ack message: %v", err)
1214 }
1215 wg.Done()
1216 }, jetstream.PullPriorityGroup("A"), jetstream.PullPrioritized(0), jetstream.PullMaxMessages(10))
1217 if err != nil {
1218 t.Fatalf("Unexpected error: %v", err)

Callers

nothing calls this directly

Calls 15

NewFunction · 0.92
PullPriorityGroupTypeAlias · 0.92
PullPrioritizedTypeAlias · 0.92
PullMaxMessagesTypeAlias · 0.92
FetchPriorityGroupFunction · 0.92
FetchPrioritizedFunction · 0.92
FetchMaxWaitFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
ErrorfMethod · 0.80
RunBasicJetStreamServerFunction · 0.70

Tested by

no test coverage detected