MCPcopy
hub / github.com/nats-io/nats.go / testJetStream_ClusterReconnectPullQueueSubscriber

Function testJetStream_ClusterReconnectPullQueueSubscriber

test/js_test.go:7638–7774  ·  view source on GitHub ↗
(t *testing.T, subject string, srvs ...*jsServer)

Source from the content-addressed store, hash-verified

7636}
7637
7638func testJetStream_ClusterReconnectPullQueueSubscriber(t *testing.T, subject string, srvs ...*jsServer) {
7639 var (
7640 recvd = make(map[string]int)
7641 recvdQ = make(map[int][]*nats.Msg)
7642 srvA = srvs[0]
7643 totalMsgs = 20
7644 reconnected = make(chan struct{}, 2)
7645 reconnectDone bool
7646 )
7647 nc, err := nats.Connect(srvA.ClientURL(),
7648 nats.ReconnectHandler(func(nc *nats.Conn) {
7649 reconnected <- struct{}{}
7650
7651 // Bring back the server after the reconnect event.
7652 if !reconnectDone {
7653 reconnectDone = true
7654 srvA.Restart()
7655 }
7656 }),
7657 )
7658 if err != nil {
7659 t.Error(err)
7660 }
7661 defer nc.Close()
7662
7663 js, err := nc.JetStream()
7664 if err != nil {
7665 t.Error(err)
7666 }
7667
7668 for i := 0; i < 10; i++ {
7669 payload := fmt.Sprintf("i:%d", i)
7670 _, err := js.Publish(subject, []byte(payload))
7671 if err != nil {
7672 t.Errorf("Unexpected error: %v", err)
7673 }
7674 }
7675
7676 subs := make([]*nats.Subscription, 0)
7677
7678 for i := 0; i < 5; i++ {
7679 sub, err := js.PullSubscribe(subject, "d1", nats.PullMaxWaiting(5))
7680 if err != nil {
7681 t.Fatal(err)
7682 }
7683 subs = append(subs, sub)
7684 }
7685
7686 for i := 10; i < totalMsgs; i++ {
7687 payload := fmt.Sprintf("i:%d", i)
7688 _, err := js.Publish(subject, []byte(payload))
7689 if err != nil {
7690 t.Errorf("Unexpected error: %v", err)
7691 }
7692 }
7693
7694 ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
7695 defer done()

Callers

nothing calls this directly

Calls 15

ConnectMethod · 0.80
ReconnectHandlerMethod · 0.80
JetStreamMethod · 0.80
ErrorfMethod · 0.80
FatalfMethod · 0.80
AckSyncMethod · 0.80
ErrorMethod · 0.65
PublishMethod · 0.65
PullSubscribeMethod · 0.65
DoneMethod · 0.65
FetchMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected