MCPcopy
hub / github.com/nats-io/nats.go / testJetStream_ClusterMultipleQueueSubscribe

Function testJetStream_ClusterMultipleQueueSubscribe

test/js_test.go:7046–7127  ·  view source on GitHub ↗
(t *testing.T, subject string, srvs ...*jsServer)

Source from the content-addressed store, hash-verified

7044}
7045
7046func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, srvs ...*jsServer) {
7047 srv := srvs[0]
7048 nc, err := nats.Connect(srv.ClientURL())
7049 if err != nil {
7050 t.Fatal(err)
7051 }
7052 defer nc.Close()
7053
7054 var wg sync.WaitGroup
7055 ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
7056 defer done()
7057
7058 js, err := nc.JetStream()
7059 if err != nil {
7060 t.Fatal(err)
7061 }
7062
7063 size := 5
7064 subs := make([]*nats.Subscription, size)
7065 errCh := make(chan error, size)
7066
7067 // We are testing auto-bind here so create one first and expect others to bind to it.
7068 sub, err := js.QueueSubscribeSync(subject, "wq", nats.Durable("shared"))
7069 if err != nil {
7070 t.Fatalf("Unexpected error: %v", err)
7071 }
7072 subs[0] = sub
7073
7074 for i := 1; i < size; i++ {
7075 wg.Add(1)
7076 go func(n int) {
7077 defer wg.Done()
7078 var sub *nats.Subscription
7079 var err error
7080 for attempt := 0; attempt < 5; attempt++ {
7081 sub, err = js.QueueSubscribeSync(subject, "wq", nats.Durable("shared"))
7082 if err != nil {
7083 time.Sleep(1 * time.Second)
7084 continue
7085 }
7086 break
7087 }
7088 if err != nil {
7089 errCh <- err
7090 } else {
7091 subs[n] = sub
7092 }
7093 }(i)
7094 }
7095
7096 go func() {
7097 // Unblock the main context when done.
7098 wg.Wait()
7099 done()
7100 }()
7101
7102 wg.Wait()
7103 for i := 0; i < size*2; i++ {

Callers

nothing calls this directly

Calls 9

PendingMethod · 0.95
ConnectMethod · 0.80
JetStreamMethod · 0.80
FatalfMethod · 0.80
QueueSubscribeSyncMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected