(t *testing.T)
| 1159 | } |
| 1160 | |
| 1161 | func TestConsumerPrioritized(t *testing.T) { |
| 1162 | t.Run("consume", func(t *testing.T) { |
| 1163 | srv := RunBasicJetStreamServer() |
| 1164 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1165 | |
| 1166 | nc, err := nats.Connect(srv.ClientURL()) |
| 1167 | if err != nil { |
| 1168 | t.Fatalf("Unexpected error: %v", err) |
| 1169 | } |
| 1170 | defer nc.Close() |
| 1171 | |
| 1172 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1173 | defer cancel() |
| 1174 | |
| 1175 | js, err := jetstream.New(nc) |
| 1176 | if err != nil { |
| 1177 | t.Fatalf("Unexpected error: %v", err) |
| 1178 | } |
| 1179 | |
| 1180 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1181 | if err != nil { |
| 1182 | t.Fatalf("Unexpected error: %v", err) |
| 1183 | } |
| 1184 | c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ |
| 1185 | Durable: "cons", |
| 1186 | AckPolicy: jetstream.AckExplicitPolicy, |
| 1187 | Description: "test consumer", |
| 1188 | PriorityPolicy: jetstream.PriorityPolicyPrioritized, |
| 1189 | PriorityGroups: []string{"A"}, |
| 1190 | }) |
| 1191 | if err != nil { |
| 1192 | t.Fatalf("Unexpected error: %v", err) |
| 1193 | } |
| 1194 | // Check that consumer got proper priority policy |
| 1195 | info := c.CachedInfo() |
| 1196 | if info.Config.PriorityPolicy != jetstream.PriorityPolicyPrioritized { |
| 1197 | t.Fatalf("Invalid priority policy; expected: %v; got: %v", jetstream.PriorityPolicyPrioritized, info.Config.PriorityPolicy) |
| 1198 | } |
| 1199 | cons1, err := s.Consumer(ctx, "cons") |
| 1200 | if err != nil { |
| 1201 | t.Fatalf("Unexpected error: %v", err) |
| 1202 | } |
| 1203 | cons2, err := s.Consumer(ctx, "cons") |
| 1204 | if err != nil { |
| 1205 | t.Fatalf("Unexpected error: %v", err) |
| 1206 | } |
| 1207 | |
| 1208 | wg := sync.WaitGroup{} |
| 1209 | wg.Add(100) |
| 1210 | errs := make(chan error, 100) |
| 1211 | cc1, err := cons1.Consume(func(m jetstream.Msg) { |
| 1212 | if err := m.Ack(); err != nil { |
| 1213 | errs <- fmt.Errorf("Failed to ack message: %v", err) |
| 1214 | } |
| 1215 | wg.Done() |
| 1216 | }, jetstream.PullPriorityGroup("A"), jetstream.PullPrioritized(0), jetstream.PullMaxMessages(10)) |
| 1217 | if err != nil { |
| 1218 | t.Fatalf("Unexpected error: %v", err) |
nothing calls this directly
no test coverage detected