(t *testing.T, subject string, srvs ...*jsServer)
| 7523 | } |
| 7524 | |
| 7525 | func testJetStream_ClusterReconnectDurablePushSubscriber(t *testing.T, subject string, srvs ...*jsServer) { |
| 7526 | var ( |
| 7527 | srvA = srvs[0] |
| 7528 | srvB = srvs[1] |
| 7529 | srvC = srvs[2] |
| 7530 | totalMsgs = 20 |
| 7531 | reconnected = make(chan struct{}) |
| 7532 | reconnectDone bool |
| 7533 | ) |
| 7534 | nc, err := nats.Connect(srvA.ClientURL(), |
| 7535 | nats.ReconnectHandler(func(nc *nats.Conn) { |
| 7536 | reconnected <- struct{}{} |
| 7537 | |
| 7538 | // Bring back the server after the reconnect event. |
| 7539 | if !reconnectDone { |
| 7540 | reconnectDone = true |
| 7541 | srvA.Restart() |
| 7542 | } |
| 7543 | }), |
| 7544 | ) |
| 7545 | if err != nil { |
| 7546 | t.Error(err) |
| 7547 | } |
| 7548 | |
| 7549 | // Drain to allow Ack responses to be published. |
| 7550 | defer nc.Drain() |
| 7551 | |
| 7552 | js, err := nc.JetStream() |
| 7553 | if err != nil { |
| 7554 | t.Error(err) |
| 7555 | } |
| 7556 | |
| 7557 | // Initial burst of messages. |
| 7558 | for i := 0; i < 10; i++ { |
| 7559 | payload := fmt.Sprintf("i:%d", i) |
| 7560 | js.Publish(subject, []byte(payload)) |
| 7561 | } |
| 7562 | |
| 7563 | // For now just confirm that do receive all messages across restarts. |
| 7564 | ctx, done := context.WithTimeout(context.Background(), 10*time.Second) |
| 7565 | defer done() |
| 7566 | |
| 7567 | recvd := make(chan *nats.Msg, totalMsgs) |
| 7568 | expected := totalMsgs |
| 7569 | _, err = js.Subscribe(subject, func(m *nats.Msg) { |
| 7570 | recvd <- m |
| 7571 | |
| 7572 | if len(recvd) == expected { |
| 7573 | done() |
| 7574 | } |
| 7575 | }, nats.Durable("sd1")) |
| 7576 | if err != nil { |
| 7577 | t.Errorf("Unexpected error: %v", err) |
| 7578 | } |
| 7579 | |
| 7580 | timeout := time.Now().Add(3 * time.Second) |
| 7581 | for time.Now().Before(timeout) { |
| 7582 | if len(recvd) >= 2 { |
nothing calls this directly
no test coverage detected