MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamSubscribeReconnect

Function TestJetStreamSubscribeReconnect

test/js_internal_test.go:327–405  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

325}
326
327func TestJetStreamSubscribeReconnect(t *testing.T) {
328 s := RunBasicJetStreamServer()
329 defer shutdownJSServerAndRemoveStorage(t, s)
330
331 rch := make(chan struct{}, 1)
332 nc, err := nats.Connect(s.ClientURL(),
333 nats.ReconnectWait(50*time.Millisecond),
334 nats.ReconnectHandler(func(_ *nats.Conn) {
335 select {
336 case rch <- struct{}{}:
337 default:
338 }
339 }))
340 if err != nil {
341 t.Fatalf("Unexpected error: %v", err)
342 }
343 defer nc.Close()
344
345 js, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
346 if err != nil {
347 t.Fatalf("Unexpected error: %v", err)
348 }
349
350 // Create the stream using our client API.
351 _, err = js.AddStream(&nats.StreamConfig{
352 Name: "TEST",
353 Subjects: []string{"foo"},
354 })
355 if err != nil {
356 t.Fatalf("Unexpected error: %v", err)
357 }
358
359 sub, err := js.SubscribeSync("foo", nats.Durable("bar"))
360 if err != nil {
361 t.Fatalf("Error on subscribe: %v", err)
362 }
363
364 sendAndReceive := func(msgContent string) {
365 t.Helper()
366 var ok bool
367 var err error
368 for i := 0; i < 5; i++ {
369 if _, err = js.Publish("foo", []byte(msgContent)); err != nil {
370 time.Sleep(250 * time.Millisecond)
371 continue
372 }
373 ok = true
374 break
375 }
376 if !ok {
377 t.Fatalf("Error on publish: %v", err)
378 }
379 msg, err := sub.NextMsg(time.Second)
380 if err != nil {
381 t.Fatal("Did not get message")
382 }
383 if string(msg.Data) != msgContent {
384 t.Fatalf("Unexpected content: %q", msg.Data)

Callers

nothing calls this directly

Calls 13

ConnectMethod · 0.80
ReconnectHandlerMethod · 0.80
FatalfMethod · 0.80
JetStreamMethod · 0.80
NextMsgMethod · 0.80
AckSyncMethod · 0.80
CloseTCPConnMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
AddStreamMethod · 0.65
SubscribeSyncMethod · 0.65
PublishMethod · 0.65

Tested by

no test coverage detected