(t *testing.T, clusterName string, size int, stream jetstream.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer))
| 247 | } |
| 248 | |
| 249 | func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream jetstream.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer)) { |
| 250 | t.Helper() |
| 251 | |
| 252 | withJSCluster(t, clusterName, size, func(t *testing.T, nodes ...*jsServer) { |
| 253 | srvA := nodes[0] |
| 254 | nc, err := nats.Connect(srvA.ClientURL()) |
| 255 | if err != nil { |
| 256 | t.Error(err) |
| 257 | } |
| 258 | defer nc.Close() |
| 259 | |
| 260 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 261 | defer cancel() |
| 262 | jsm, err := jetstream.New(nc) |
| 263 | if err != nil { |
| 264 | t.Fatal(err) |
| 265 | } |
| 266 | CreateStream: |
| 267 | for { |
| 268 | select { |
| 269 | case <-ctx.Done(): |
| 270 | if err != nil { |
| 271 | t.Fatalf("Unexpected error creating stream: %v", err) |
| 272 | } |
| 273 | t.Fatalf("Unable to create stream on cluster") |
| 274 | case <-time.After(500 * time.Millisecond): |
| 275 | _, err = jsm.AccountInfo(ctx) |
| 276 | if err != nil { |
| 277 | // Backoff for a bit until cluster and resources are ready. |
| 278 | time.Sleep(500 * time.Millisecond) |
| 279 | } |
| 280 | _, err = jsm.CreateStream(ctx, stream) |
| 281 | if err != nil { |
| 282 | continue CreateStream |
| 283 | } |
| 284 | break CreateStream |
| 285 | } |
| 286 | } |
| 287 | |
| 288 | tfn(t, stream.Name, nodes...) |
| 289 | }) |
| 290 | } |
| 291 | |
| 292 | func withJSCluster(t *testing.T, clusterName string, size int, tfn func(t *testing.T, srvs ...*jsServer)) { |
| 293 | t.Helper() |
no test coverage detected