MCPcopy
hub / github.com/nats-io/nats.go / withJSClusterAndStream

Function withJSClusterAndStream

jetstream/test/helper_test.go:249–290  ·  view source on GitHub ↗
(t *testing.T, clusterName string, size int, stream jetstream.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer))

Source from the content-addressed store, hash-verified

247}
248
249func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream jetstream.StreamConfig, tfn func(t *testing.T, subject string, srvs ...*jsServer)) {
250 t.Helper()
251
252 withJSCluster(t, clusterName, size, func(t *testing.T, nodes ...*jsServer) {
253 srvA := nodes[0]
254 nc, err := nats.Connect(srvA.ClientURL())
255 if err != nil {
256 t.Error(err)
257 }
258 defer nc.Close()
259
260 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
261 defer cancel()
262 jsm, err := jetstream.New(nc)
263 if err != nil {
264 t.Fatal(err)
265 }
266 CreateStream:
267 for {
268 select {
269 case <-ctx.Done():
270 if err != nil {
271 t.Fatalf("Unexpected error creating stream: %v", err)
272 }
273 t.Fatalf("Unable to create stream on cluster")
274 case <-time.After(500 * time.Millisecond):
275 _, err = jsm.AccountInfo(ctx)
276 if err != nil {
277 // Backoff for a bit until cluster and resources are ready.
278 time.Sleep(500 * time.Millisecond)
279 }
280 _, err = jsm.CreateStream(ctx, stream)
281 if err != nil {
282 continue CreateStream
283 }
284 break CreateStream
285 }
286 }
287
288 tfn(t, stream.Name, nodes...)
289 })
290}
291
292func withJSCluster(t *testing.T, clusterName string, size int, tfn func(t *testing.T, srvs ...*jsServer)) {
293 t.Helper()

Calls 9

NewFunction · 0.92
ConnectMethod · 0.80
FatalfMethod · 0.80
withJSClusterFunction · 0.70
ErrorMethod · 0.65
DoneMethod · 0.65
AccountInfoMethod · 0.65
CreateStreamMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected