(t *testing.T, subject string, srvs ...*jsServer)
| 7201 | } |
| 7202 | |
| 7203 | func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) { |
| 7204 | srv := srvs[0] |
| 7205 | nc, js := jsClient(t, srv.Server) |
| 7206 | defer nc.Close() |
| 7207 | |
| 7208 | var wg sync.WaitGroup |
| 7209 | ctx, done := context.WithTimeout(context.Background(), 5*time.Second) |
| 7210 | defer done() |
| 7211 | |
| 7212 | // Setup a number of subscriptions with different inboxes that will be |
| 7213 | // fetching the messages in parallel. |
| 7214 | nsubs := 4 |
| 7215 | subs := make([]*nats.Subscription, nsubs) |
| 7216 | errCh := make(chan error, nsubs) |
| 7217 | var queues sync.Map |
| 7218 | for i := 0; i < nsubs; i++ { |
| 7219 | wg.Add(1) |
| 7220 | go func(n int) { |
| 7221 | defer wg.Done() |
| 7222 | var sub *nats.Subscription |
| 7223 | var err error |
| 7224 | sub, err = js.PullSubscribe(subject, "shared") |
| 7225 | if err != nil { |
| 7226 | errCh <- err |
| 7227 | } else { |
| 7228 | subs[n] = sub |
| 7229 | queues.Store(sub.Subject, make([]*nats.Msg, 0)) |
| 7230 | } |
| 7231 | }(i) |
| 7232 | } |
| 7233 | |
| 7234 | // Publishing of messages happens after the subscriptions are ready. |
| 7235 | // The subscribers will be fetching messages while these are being |
| 7236 | // produced so sometimes there are not going to be messages available. |
| 7237 | wg.Wait() |
| 7238 | var ( |
| 7239 | total uint64 = 100 |
| 7240 | delivered uint64 |
| 7241 | batchSize = 2 |
| 7242 | ) |
| 7243 | go func() { |
| 7244 | for i := 0; i < int(total); i++ { |
| 7245 | js.Publish(subject, []byte(fmt.Sprintf("n:%v", i))) |
| 7246 | time.Sleep(1 * time.Millisecond) |
| 7247 | } |
| 7248 | }() |
| 7249 | |
| 7250 | ctx2, done2 := context.WithTimeout(ctx, 3*time.Second) |
| 7251 | defer done2() |
| 7252 | |
| 7253 | for _, psub := range subs { |
| 7254 | if psub == nil { |
| 7255 | continue |
| 7256 | } |
| 7257 | sub := psub |
| 7258 | subject := sub.Subject |
| 7259 | v, _ := queues.Load(sub.Subject) |
| 7260 | queue := v.([]*nats.Msg) |
nothing calls this directly
no test coverage detected