We want to make sure we do the right thing with lots of concurrent queue durable consumer requests. One should win and the others should share the delivery subject with the first one who wins.
(t *testing.T)
| 10490 | // We want to make sure we do the right thing with lots of concurrent queue durable consumer requests. |
| 10491 | // One should win and the others should share the delivery subject with the first one who wins. |
| 10492 | func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) { |
| 10493 | s := RunBasicJetStreamServer() |
| 10494 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 10495 | |
| 10496 | nc, js := jsClient(t, s) |
| 10497 | defer nc.Close() |
| 10498 | |
| 10499 | var err error |
| 10500 | |
| 10501 | // Create stream. |
| 10502 | _, err = js.AddStream(&nats.StreamConfig{ |
| 10503 | Name: "TEST", |
| 10504 | Subjects: []string{"foo"}, |
| 10505 | }) |
| 10506 | if err != nil { |
| 10507 | t.Fatalf("Unexpected error: %v", err) |
| 10508 | } |
| 10509 | |
| 10510 | // Now create 10 durables concurrently. |
| 10511 | subs := make([]*nats.Subscription, 0, 10) |
| 10512 | var wg sync.WaitGroup |
| 10513 | mx := &sync.Mutex{} |
| 10514 | |
| 10515 | for i := 0; i < 10; i++ { |
| 10516 | wg.Add(1) |
| 10517 | go func() { |
| 10518 | defer wg.Done() |
| 10519 | sub, _ := js.QueueSubscribeSync("foo", "bar") |
| 10520 | mx.Lock() |
| 10521 | subs = append(subs, sub) |
| 10522 | mx.Unlock() |
| 10523 | }() |
| 10524 | } |
| 10525 | // Wait for all the consumers. |
| 10526 | wg.Wait() |
| 10527 | |
| 10528 | si, err := js.StreamInfo("TEST") |
| 10529 | if err != nil { |
| 10530 | t.Fatalf("Unexpected error: %v", err) |
| 10531 | } |
| 10532 | if si.State.Consumers != 1 { |
| 10533 | t.Fatalf("Expected exactly one consumer, got %d", si.State.Consumers) |
| 10534 | } |
| 10535 | |
| 10536 | // Now send some messages and make sure they are distributed. |
| 10537 | total := 1000 |
| 10538 | for i := 0; i < total; i++ { |
| 10539 | js.Publish("foo", []byte("Hello")) |
| 10540 | } |
| 10541 | |
| 10542 | timeout := time.Now().Add(2 * time.Second) |
| 10543 | got := 0 |
| 10544 | for time.Now().Before(timeout) { |
| 10545 | got = 0 |
| 10546 | for _, sub := range subs { |
| 10547 | pending, _, _ := sub.Pending() |
| 10548 | // If a single sub has the total, then probably something is not right. |
| 10549 | if pending == total { |
nothing calls this directly
no test coverage detected