MCPcopy
hub / github.com/nats-io/nats.go / testJetStream_ClusterReconnectDurableQueueSubscriber

Function testJetStream_ClusterReconnectDurableQueueSubscriber

test/js_test.go:7378–7523  ·  view source on GitHub ↗
(t *testing.T, subject string, srvs ...*jsServer)

Source from the content-addressed store, hash-verified

7376}
7377
7378func testJetStream_ClusterReconnectDurableQueueSubscriber(t *testing.T, subject string, srvs ...*jsServer) {
7379 var (
7380 srvA = srvs[0]
7381 totalMsgs = 20
7382 reconnected = make(chan struct{})
7383 reconnectDone bool
7384 )
7385 nc, err := nats.Connect(srvA.ClientURL(),
7386 nats.ReconnectHandler(func(nc *nats.Conn) {
7387 reconnected <- struct{}{}
7388
7389 // Bring back the server after the reconnect event.
7390 if !reconnectDone {
7391 reconnectDone = true
7392 srvA.Restart()
7393 }
7394 }),
7395 nats.ErrorHandler(func(_ *nats.Conn, sub *nats.Subscription, err error) {
7396 t.Logf("WARN: Got error %v", err)
7397 if info, ok := err.(*nats.ErrConsumerSequenceMismatch); ok {
7398 t.Logf("WARN: %+v", info)
7399 }
7400 // Take out this QueueSubscriber from the group.
7401 sub.Drain()
7402 }),
7403 )
7404 if err != nil {
7405 t.Error(err)
7406 }
7407
7408 defer nc.Close()
7409
7410 js, err := nc.JetStream()
7411 if err != nil {
7412 t.Error(err)
7413 }
7414
7415 for i := 0; i < 10; i++ {
7416 payload := fmt.Sprintf("i:%d", i)
7417 js.Publish(subject, []byte(payload))
7418 }
7419
7420 ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
7421 defer done()
7422
7423 msgs := make(chan *nats.Msg, totalMsgs)
7424
7425 // Create some queue subscribers.
7426 for i := 0; i < 5; i++ {
7427 expected := totalMsgs
7428 dname := "dur"
7429 _, err = js.QueueSubscribe(subject, "wg", func(m *nats.Msg) {
7430 msgs <- m
7431
7432 count := len(msgs)
7433 switch {
7434 case count == 2:
7435 // Do not ack and wait for redelivery on reconnect.

Callers

nothing calls this directly

Calls 15

ConnectMethod · 0.80
ReconnectHandlerMethod · 0.80
ErrorHandlerMethod · 0.80
JetStreamMethod · 0.80
AckSyncMethod · 0.80
FatalfMethod · 0.80
ErrorfMethod · 0.80
DrainMethod · 0.65
ErrorMethod · 0.65
PublishMethod · 0.65
QueueSubscribeMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected