MCPcopy
hub / github.com/nats-io/nats.go / testJetStream_ClusterMultiplePullSubscribe

Function testJetStream_ClusterMultiplePullSubscribe

test/js_test.go:7129–7201  ·  view source on GitHub ↗
(t *testing.T, subject string, srvs ...*jsServer)

Source from the content-addressed store, hash-verified

7127}
7128
7129func testJetStream_ClusterMultiplePullSubscribe(t *testing.T, subject string, srvs ...*jsServer) {
7130 srv := srvs[0]
7131 nc, js := jsClient(t, srv.Server)
7132 defer nc.Close()
7133
7134 var wg sync.WaitGroup
7135 ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
7136 defer done()
7137
7138 size := 5
7139 subs := make([]*nats.Subscription, size)
7140 errCh := make(chan error, size)
7141 for i := 0; i < size; i++ {
7142 wg.Add(1)
7143 go func(n int) {
7144 defer wg.Done()
7145 var sub *nats.Subscription
7146 var err error
7147 for attempt := 0; attempt < 5; attempt++ {
7148 sub, err = js.PullSubscribe(subject, "shared")
7149 if err != nil {
7150 time.Sleep(1 * time.Second)
7151 continue
7152 }
7153 break
7154 }
7155 if err != nil {
7156 errCh <- err
7157 } else {
7158 subs[n] = sub
7159 }
7160 }(i)
7161 }
7162
7163 go func() {
7164 // Unblock the main context when done.
7165 wg.Wait()
7166 done()
7167 }()
7168
7169 wg.Wait()
7170 for i := 0; i < size*2; i++ {
7171 js.Publish(subject, []byte("test"))
7172 }
7173
7174 delivered := 0
7175 for i, sub := range subs {
7176 if sub == nil {
7177 continue
7178 }
7179 for attempt := 0; attempt < 4; attempt++ {
7180 _, err := sub.Fetch(1, nats.MaxWait(250*time.Millisecond))
7181 if err != nil {
7182 t.Logf("%v WARN: Timeout waiting for next message: %v", i, err)
7183 continue
7184 }
7185 delivered++
7186 break

Callers

nothing calls this directly

Calls 8

FetchMethod · 0.95
FatalfMethod · 0.80
jsClientFunction · 0.70
AddMethod · 0.65
DoneMethod · 0.65
PullSubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected