MCPcopy
hub / github.com/nats-io/nats.go / testJetStream_ClusterReconnectDurablePushSubscriber

Function testJetStream_ClusterReconnectDurablePushSubscriber

test/js_test.go:7525–7636  ·  view source on GitHub ↗
(t *testing.T, subject string, srvs ...*jsServer)

Source from the content-addressed store, hash-verified

7523}
7524
7525func testJetStream_ClusterReconnectDurablePushSubscriber(t *testing.T, subject string, srvs ...*jsServer) {
7526 var (
7527 srvA = srvs[0]
7528 srvB = srvs[1]
7529 srvC = srvs[2]
7530 totalMsgs = 20
7531 reconnected = make(chan struct{})
7532 reconnectDone bool
7533 )
7534 nc, err := nats.Connect(srvA.ClientURL(),
7535 nats.ReconnectHandler(func(nc *nats.Conn) {
7536 reconnected <- struct{}{}
7537
7538 // Bring back the server after the reconnect event.
7539 if !reconnectDone {
7540 reconnectDone = true
7541 srvA.Restart()
7542 }
7543 }),
7544 )
7545 if err != nil {
7546 t.Error(err)
7547 }
7548
7549 // Drain to allow Ack responses to be published.
7550 defer nc.Drain()
7551
7552 js, err := nc.JetStream()
7553 if err != nil {
7554 t.Error(err)
7555 }
7556
7557 // Initial burst of messages.
7558 for i := 0; i < 10; i++ {
7559 payload := fmt.Sprintf("i:%d", i)
7560 js.Publish(subject, []byte(payload))
7561 }
7562
7563 // For now just confirm that do receive all messages across restarts.
7564 ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
7565 defer done()
7566
7567 recvd := make(chan *nats.Msg, totalMsgs)
7568 expected := totalMsgs
7569 _, err = js.Subscribe(subject, func(m *nats.Msg) {
7570 recvd <- m
7571
7572 if len(recvd) == expected {
7573 done()
7574 }
7575 }, nats.Durable("sd1"))
7576 if err != nil {
7577 t.Errorf("Unexpected error: %v", err)
7578 }
7579
7580 timeout := time.Now().Add(3 * time.Second)
7581 for time.Now().Before(timeout) {
7582 if len(recvd) >= 2 {

Callers

nothing calls this directly

Calls 12

ConnectMethod · 0.80
ReconnectHandlerMethod · 0.80
JetStreamMethod · 0.80
ErrorfMethod · 0.80
ConnectedUrlMethod · 0.80
ErrorMethod · 0.65
DrainMethod · 0.65
PublishMethod · 0.65
SubscribeMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
RestartMethod · 0.45

Tested by

no test coverage detected