(t *testing.T, clusterName string, size int, stream *nats.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer))
| 6522 | } |
| 6523 | |
| 6524 | func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *nats.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer)) { |
| 6525 | t.Helper() |
| 6526 | |
| 6527 | withJSCluster(t, clusterName, size, func(t *testing.T, nodes ...*jsServer) { |
| 6528 | srvA := nodes[0] |
| 6529 | nc, err := nats.Connect(srvA.ClientURL()) |
| 6530 | if err != nil { |
| 6531 | t.Error(err) |
| 6532 | } |
| 6533 | defer nc.Close() |
| 6534 | |
| 6535 | timeout := time.Now().Add(10 * time.Second) |
| 6536 | for time.Now().Before(timeout) { |
| 6537 | jsm, err := nc.JetStream() |
| 6538 | if err != nil { |
| 6539 | t.Fatal(err) |
| 6540 | } |
| 6541 | _, err = jsm.AccountInfo() |
| 6542 | if err != nil { |
| 6543 | // Backoff for a bit until cluster and resources are ready. |
| 6544 | time.Sleep(500 * time.Millisecond) |
| 6545 | } |
| 6546 | |
| 6547 | _, err = jsm.AddStream(stream) |
| 6548 | if err != nil { |
| 6549 | time.Sleep(500 * time.Millisecond) |
| 6550 | continue |
| 6551 | } |
| 6552 | break |
| 6553 | } |
| 6554 | if err != nil { |
| 6555 | t.Fatalf("Unexpected error creating stream: %v", err) |
| 6556 | } |
| 6557 | |
| 6558 | tfn(t, stream.Name, nodes...) |
| 6559 | }) |
| 6560 | } |
| 6561 | |
| 6562 | func waitForJSReady(t *testing.T, nc *nats.Conn) { |
| 6563 | var err error |
no test coverage detected