MCPcopy
hub / github.com/nats-io/nats.go / withJSClusterAndStream

Function withJSClusterAndStream

test/js_test.go:6524–6560  ·  view source on GitHub ↗
(t *testing.T, clusterName string, size int, stream *nats.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer))

Source from the content-addressed store, hash-verified

6522}
6523
6524func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *nats.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer)) {
6525 t.Helper()
6526
6527 withJSCluster(t, clusterName, size, func(t *testing.T, nodes ...*jsServer) {
6528 srvA := nodes[0]
6529 nc, err := nats.Connect(srvA.ClientURL())
6530 if err != nil {
6531 t.Error(err)
6532 }
6533 defer nc.Close()
6534
6535 timeout := time.Now().Add(10 * time.Second)
6536 for time.Now().Before(timeout) {
6537 jsm, err := nc.JetStream()
6538 if err != nil {
6539 t.Fatal(err)
6540 }
6541 _, err = jsm.AccountInfo()
6542 if err != nil {
6543 // Backoff for a bit until cluster and resources are ready.
6544 time.Sleep(500 * time.Millisecond)
6545 }
6546
6547 _, err = jsm.AddStream(stream)
6548 if err != nil {
6549 time.Sleep(500 * time.Millisecond)
6550 continue
6551 }
6552 break
6553 }
6554 if err != nil {
6555 t.Fatalf("Unexpected error creating stream: %v", err)
6556 }
6557
6558 tfn(t, stream.Name, nodes...)
6559 })
6560}
6561
6562func waitForJSReady(t *testing.T, nc *nats.Conn) {
6563 var err error

Calls 9

ConnectMethod · 0.80
JetStreamMethod · 0.80
FatalfMethod · 0.80
withJSClusterFunction · 0.70
ErrorMethod · 0.65
AddMethod · 0.65
AccountInfoMethod · 0.65
AddStreamMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected