(t *testing.T, subject string, srvs ...*jsServer)
| 7636 | } |
| 7637 | |
| 7638 | func testJetStream_ClusterReconnectPullQueueSubscriber(t *testing.T, subject string, srvs ...*jsServer) { |
| 7639 | var ( |
| 7640 | recvd = make(map[string]int) |
| 7641 | recvdQ = make(map[int][]*nats.Msg) |
| 7642 | srvA = srvs[0] |
| 7643 | totalMsgs = 20 |
| 7644 | reconnected = make(chan struct{}, 2) |
| 7645 | reconnectDone bool |
| 7646 | ) |
| 7647 | nc, err := nats.Connect(srvA.ClientURL(), |
| 7648 | nats.ReconnectHandler(func(nc *nats.Conn) { |
| 7649 | reconnected <- struct{}{} |
| 7650 | |
| 7651 | // Bring back the server after the reconnect event. |
| 7652 | if !reconnectDone { |
| 7653 | reconnectDone = true |
| 7654 | srvA.Restart() |
| 7655 | } |
| 7656 | }), |
| 7657 | ) |
| 7658 | if err != nil { |
| 7659 | t.Error(err) |
| 7660 | } |
| 7661 | defer nc.Close() |
| 7662 | |
| 7663 | js, err := nc.JetStream() |
| 7664 | if err != nil { |
| 7665 | t.Error(err) |
| 7666 | } |
| 7667 | |
| 7668 | for i := 0; i < 10; i++ { |
| 7669 | payload := fmt.Sprintf("i:%d", i) |
| 7670 | _, err := js.Publish(subject, []byte(payload)) |
| 7671 | if err != nil { |
| 7672 | t.Errorf("Unexpected error: %v", err) |
| 7673 | } |
| 7674 | } |
| 7675 | |
| 7676 | subs := make([]*nats.Subscription, 0) |
| 7677 | |
| 7678 | for i := 0; i < 5; i++ { |
| 7679 | sub, err := js.PullSubscribe(subject, "d1", nats.PullMaxWaiting(5)) |
| 7680 | if err != nil { |
| 7681 | t.Fatal(err) |
| 7682 | } |
| 7683 | subs = append(subs, sub) |
| 7684 | } |
| 7685 | |
| 7686 | for i := 10; i < totalMsgs; i++ { |
| 7687 | payload := fmt.Sprintf("i:%d", i) |
| 7688 | _, err := js.Publish(subject, []byte(payload)) |
| 7689 | if err != nil { |
| 7690 | t.Errorf("Unexpected error: %v", err) |
| 7691 | } |
| 7692 | } |
| 7693 | |
| 7694 | ctx, done := context.WithTimeout(context.Background(), 10*time.Second) |
| 7695 | defer done() |
nothing calls this directly
no test coverage detected