| 7127 | } |
| 7128 | |
| 7129 | func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, srvs ...*jsServer) { |
| 7130 | srv := srvs[0] |
| 7131 | nc, js := jsClient(t, srv.Server) |
| 7132 | defer nc.Close() |
| 7133 | |
| 7134 | var wg sync.WaitGroup |
| 7135 | ctx, done := context.WithTimeout(context.Background(), 2*time.Second) |
| 7136 | defer done() |
| 7137 | |
| 7138 | size := 5 |
| 7139 | subs := make([]*nats.Subscription, size) |
| 7140 | errCh := make(chan error, size) |
| 7141 | for i := 0; i < size; i++ { |
| 7142 | wg.Add(1) |
| 7143 | go func(n int) { |
| 7144 | defer wg.Done() |
| 7145 | var sub *nats.Subscription |
| 7146 | var err error |
| 7147 | for attempt := 0; attempt < 5; attempt++ { |
| 7148 | sub, err = js.PullSubscribe(subject, "shared") |
| 7149 | if err != nil { |
| 7150 | time.Sleep(1 * time.Second) |
| 7151 | continue |
| 7152 | } |
| 7153 | break |
| 7154 | } |
| 7155 | if err != nil { |
| 7156 | errCh <- err |
| 7157 | } else { |
| 7158 | subs[n] = sub |
| 7159 | } |
| 7160 | }(i) |
| 7161 | } |
| 7162 | |
| 7163 | go func() { |
| 7164 | // Unblock the main context when done. |
| 7165 | wg.Wait() |
| 7166 | done() |
| 7167 | }() |
| 7168 | |
| 7169 | wg.Wait() |
| 7170 | for i := 0; i < size*2; i++ { |
| 7171 | js.Publish(subject, []byte("test")) |
| 7172 | } |
| 7173 | |
| 7174 | delivered := 0 |
| 7175 | for i, sub := range subs { |
| 7176 | if sub == nil { |
| 7177 | continue |
| 7178 | } |
| 7179 | for attempt := 0; attempt < 4; attempt++ { |
| 7180 | _, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) |
| 7181 | if err != nil { |
| 7182 | t.Logf("%v WARN: Timeout waiting for next message: %v", i, err) |
| 7183 | continue |
| 7184 | } |
| 7185 | delivered++ |
| 7186 | break |