MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamConcurrentQueueDurablePushConsumers

Function TestJetStreamConcurrentQueueDurablePushConsumers

test/js_test.go:10492–10560  ·  view source on GitHub ↗

We want to make sure we do the right thing with lots of concurrent queue durable consumer requests. One should win and the others should share the delivery subject with the first one who wins.

(t *testing.T)

Source from the content-addressed store, hash-verified

10490// We want to make sure we do the right thing with lots of concurrent queue durable consumer requests.
10491// One should win and the others should share the delivery subject with the first one who wins.
10492func TestJetStreamConcurrentQueueDurablePushConsumers(t *testing.T) {
10493 s := RunBasicJetStreamServer()
10494 defer shutdownJSServerAndRemoveStorage(t, s)
10495
10496 nc, js := jsClient(t, s)
10497 defer nc.Close()
10498
10499 var err error
10500
10501 // Create stream.
10502 _, err = js.AddStream(&nats.StreamConfig{
10503 Name: "TEST",
10504 Subjects: []string{"foo"},
10505 })
10506 if err != nil {
10507 t.Fatalf("Unexpected error: %v", err)
10508 }
10509
10510 // Now create 10 durables concurrently.
10511 subs := make([]*nats.Subscription, 0, 10)
10512 var wg sync.WaitGroup
10513 mx := &sync.Mutex{}
10514
10515 for i := 0; i < 10; i++ {
10516 wg.Add(1)
10517 go func() {
10518 defer wg.Done()
10519 sub, _ := js.QueueSubscribeSync("foo", "bar")
10520 mx.Lock()
10521 subs = append(subs, sub)
10522 mx.Unlock()
10523 }()
10524 }
10525 // Wait for all the consumers.
10526 wg.Wait()
10527
10528 si, err := js.StreamInfo("TEST")
10529 if err != nil {
10530 t.Fatalf("Unexpected error: %v", err)
10531 }
10532 if si.State.Consumers != 1 {
10533 t.Fatalf("Expected exactly one consumer, got %d", si.State.Consumers)
10534 }
10535
10536 // Now send some messages and make sure they are distributed.
10537 total := 1000
10538 for i := 0; i < total; i++ {
10539 js.Publish("foo", []byte("Hello"))
10540 }
10541
10542 timeout := time.Now().Add(2 * time.Second)
10543 got := 0
10544 for time.Now().Before(timeout) {
10545 got = 0
10546 for _, sub := range subs {
10547 pending, _, _ := sub.Pending()
10548 // If a single sub has the total, then probably something is not right.
10549 if pending == total {

Callers

nothing calls this directly

Calls 12

FatalfMethod · 0.80
PendingMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
QueueSubscribeSyncMethod · 0.65
StreamInfoMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected