MCPcopy
hub / github.com/nats-io/nats.go / testJetStream_ClusterMultipleFetchPullSubscribe

Function testJetStream_ClusterMultipleFetchPullSubscribe

test/js_test.go:7203–7334  ·  view source on GitHub ↗
(t *testing.T, subject string, srvs ...*jsServer)

Source from the content-addressed store, hash-verified

7201}
7202
7203func testJetStream_ClusterMultipleFetchPullSubscribe(t *testing.T, subject string, srvs ...*jsServer) {
7204 srv := srvs[0]
7205 nc, js := jsClient(t, srv.Server)
7206 defer nc.Close()
7207
7208 var wg sync.WaitGroup
7209 ctx, done := context.WithTimeout(context.Background(), 5*time.Second)
7210 defer done()
7211
7212 // Setup a number of subscriptions with different inboxes that will be
7213 // fetching the messages in parallel.
7214 nsubs := 4
7215 subs := make([]*nats.Subscription, nsubs)
7216 errCh := make(chan error, nsubs)
7217 var queues sync.Map
7218 for i := 0; i < nsubs; i++ {
7219 wg.Add(1)
7220 go func(n int) {
7221 defer wg.Done()
7222 var sub *nats.Subscription
7223 var err error
7224 sub, err = js.PullSubscribe(subject, "shared")
7225 if err != nil {
7226 errCh <- err
7227 } else {
7228 subs[n] = sub
7229 queues.Store(sub.Subject, make([]*nats.Msg, 0))
7230 }
7231 }(i)
7232 }
7233
7234 // Publishing of messages happens after the subscriptions are ready.
7235 // The subscribers will be fetching messages while these are being
7236 // produced so sometimes there are not going to be messages available.
7237 wg.Wait()
7238 var (
7239 total uint64 = 100
7240 delivered uint64
7241 batchSize = 2
7242 )
7243 go func() {
7244 for i := 0; i < int(total); i++ {
7245 js.Publish(subject, []byte(fmt.Sprintf("n:%v", i)))
7246 time.Sleep(1 * time.Millisecond)
7247 }
7248 }()
7249
7250 ctx2, done2 := context.WithTimeout(ctx, 3*time.Second)
7251 defer done2()
7252
7253 for _, psub := range subs {
7254 if psub == nil {
7255 continue
7256 }
7257 sub := psub
7258 subject := sub.Subject
7259 v, _ := queues.Load(sub.Subject)
7260 queue := v.([]*nats.Msg)

Callers

nothing calls this directly

Calls 12

FetchMethod · 0.95
StoreMethod · 0.80
LoadMethod · 0.80
FatalfMethod · 0.80
RangeMethod · 0.80
jsClientFunction · 0.70
AddMethod · 0.65
DoneMethod · 0.65
PullSubscribeMethod · 0.65
PublishMethod · 0.65
ErrorMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected