| 7044 | } |
| 7045 | |
| 7046 | func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, srvs ...*jsServer) { |
| 7047 | srv := srvs[0] |
| 7048 | nc, err := nats.Connect(srv.ClientURL()) |
| 7049 | if err != nil { |
| 7050 | t.Fatal(err) |
| 7051 | } |
| 7052 | defer nc.Close() |
| 7053 | |
| 7054 | var wg sync.WaitGroup |
| 7055 | ctx, done := context.WithTimeout(context.Background(), 2*time.Second) |
| 7056 | defer done() |
| 7057 | |
| 7058 | js, err := nc.JetStream() |
| 7059 | if err != nil { |
| 7060 | t.Fatal(err) |
| 7061 | } |
| 7062 | |
| 7063 | size := 5 |
| 7064 | subs := make([]*nats.Subscription, size) |
| 7065 | errCh := make(chan error, size) |
| 7066 | |
| 7067 | // We are testing auto-bind here so create one first and expect others to bind to it. |
| 7068 | sub, err := js.QueueSubscribeSync(subject, "wq", nats.Durable("shared")) |
| 7069 | if err != nil { |
| 7070 | t.Fatalf("Unexpected error: %v", err) |
| 7071 | } |
| 7072 | subs[0] = sub |
| 7073 | |
| 7074 | for i := 1; i < size; i++ { |
| 7075 | wg.Add(1) |
| 7076 | go func(n int) { |
| 7077 | defer wg.Done() |
| 7078 | var sub *nats.Subscription |
| 7079 | var err error |
| 7080 | for attempt := 0; attempt < 5; attempt++ { |
| 7081 | sub, err = js.QueueSubscribeSync(subject, "wq", nats.Durable("shared")) |
| 7082 | if err != nil { |
| 7083 | time.Sleep(1 * time.Second) |
| 7084 | continue |
| 7085 | } |
| 7086 | break |
| 7087 | } |
| 7088 | if err != nil { |
| 7089 | errCh <- err |
| 7090 | } else { |
| 7091 | subs[n] = sub |
| 7092 | } |
| 7093 | }(i) |
| 7094 | } |
| 7095 | |
| 7096 | go func() { |
| 7097 | // Unblock the main context when done. |
| 7098 | wg.Wait() |
| 7099 | done() |
| 7100 | }() |
| 7101 | |
| 7102 | wg.Wait() |
| 7103 | for i := 0; i < size*2; i++ { |