MCPcopy
hub / github.com/nats-io/nats.go / TestFullFlushChanDuringReconnect

Function TestFullFlushChanDuringReconnect

test/reconnect_test.go:498–561  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

496}
497
498func TestFullFlushChanDuringReconnect(t *testing.T) {
499 ts := startReconnectServer(t)
500 defer ts.Shutdown()
501
502 reconnectch := make(chan bool, 2)
503
504 opts := nats.GetDefaultOptions()
505 opts.Url = fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT)
506 opts.AllowReconnect = true
507 opts.MaxReconnect = 10000
508 opts.ReconnectWait = 100 * time.Millisecond
509 nats.ReconnectJitter(0, 0)(&opts)
510
511 opts.ReconnectedCB = func(_ *nats.Conn) {
512 reconnectch <- true
513 }
514
515 // Connect
516 nc, err := opts.Connect()
517 if err != nil {
518 t.Fatalf("Should have connected ok: %v", err)
519 }
520 defer nc.Close()
521
522 // Channel used to make the go routine sending messages to stop.
523 stop := make(chan bool)
524
525 // While connected, publish as fast as we can
526 go func() {
527 for i := 0; ; i++ {
528 _ = nc.Publish("foo", []byte("hello"))
529
530 // Make sure we are sending at least flushChanSize (1024) messages
531 // before potentially pausing.
532 if i%2000 == 0 {
533 select {
534 case <-stop:
535 return
536 default:
537 time.Sleep(100 * time.Millisecond)
538 }
539 }
540 }
541 }()
542
543 // Send a bit...
544 time.Sleep(500 * time.Millisecond)
545
546 // Shut down the server
547 ts.Shutdown()
548
549 // Continue sending while we are disconnected
550 time.Sleep(time.Second)
551
552 // Restart the server
553 ts = startReconnectServer(t)
554 defer ts.Shutdown()
555

Callers

nothing calls this directly

Calls 6

startReconnectServerFunction · 0.85
ConnectMethod · 0.80
FatalfMethod · 0.80
WaitTimeFunction · 0.70
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected