(t *testing.T, subject string, srvs ...*jsServer)
| 7376 | } |
| 7377 | |
| 7378 | func testJetStream_ClusterReconnectDurableQueueSubscriber(t *testing.T, subject string, srvs ...*jsServer) { |
| 7379 | var ( |
| 7380 | srvA = srvs[0] |
| 7381 | totalMsgs = 20 |
| 7382 | reconnected = make(chan struct{}) |
| 7383 | reconnectDone bool |
| 7384 | ) |
| 7385 | nc, err := nats.Connect(srvA.ClientURL(), |
| 7386 | nats.ReconnectHandler(func(nc *nats.Conn) { |
| 7387 | reconnected <- struct{}{} |
| 7388 | |
| 7389 | // Bring back the server after the reconnect event. |
| 7390 | if !reconnectDone { |
| 7391 | reconnectDone = true |
| 7392 | srvA.Restart() |
| 7393 | } |
| 7394 | }), |
| 7395 | nats.ErrorHandler(func(_ *nats.Conn, sub *nats.Subscription, err error) { |
| 7396 | t.Logf("WARN: Got error %v", err) |
| 7397 | if info, ok := err.(*nats.ErrConsumerSequenceMismatch); ok { |
| 7398 | t.Logf("WARN: %+v", info) |
| 7399 | } |
| 7400 | // Take out this QueueSubscriber from the group. |
| 7401 | sub.Drain() |
| 7402 | }), |
| 7403 | ) |
| 7404 | if err != nil { |
| 7405 | t.Error(err) |
| 7406 | } |
| 7407 | |
| 7408 | defer nc.Close() |
| 7409 | |
| 7410 | js, err := nc.JetStream() |
| 7411 | if err != nil { |
| 7412 | t.Error(err) |
| 7413 | } |
| 7414 | |
| 7415 | for i := 0; i < 10; i++ { |
| 7416 | payload := fmt.Sprintf("i:%d", i) |
| 7417 | js.Publish(subject, []byte(payload)) |
| 7418 | } |
| 7419 | |
| 7420 | ctx, done := context.WithTimeout(context.Background(), 10*time.Second) |
| 7421 | defer done() |
| 7422 | |
| 7423 | msgs := make(chan *nats.Msg, totalMsgs) |
| 7424 | |
| 7425 | // Create some queue subscribers. |
| 7426 | for i := 0; i < 5; i++ { |
| 7427 | expected := totalMsgs |
| 7428 | dname := "dur" |
| 7429 | _, err = js.QueueSubscribe(subject, "wg", func(m *nats.Msg) { |
| 7430 | msgs <- m |
| 7431 | |
| 7432 | count := len(msgs) |
| 7433 | switch { |
| 7434 | case count == 2: |
| 7435 | // Do not ack and wait for redelivery on reconnect. |
nothing calls this directly
no test coverage detected