MCPcopy
hub / github.com/nats-io/nats.go / TestAsyncErrHandler

Function TestAsyncErrHandler

test/sub_test.go:572–649  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

570}
571
572func TestAsyncErrHandler(t *testing.T) {
573 s := RunDefaultServer()
574 defer s.Shutdown()
575
576 opts := nats.GetDefaultOptions()
577
578 nc, err := opts.Connect()
579 if err != nil {
580 t.Fatalf("Could not connect to server: %v\n", err)
581 }
582 defer nc.Close()
583
584 subj := "async_test"
585 bch := make(chan bool)
586
587 sub, err := nc.Subscribe(subj, func(_ *nats.Msg) {
588 // block to back us up..
589 <-bch
590 })
591 if err != nil {
592 t.Fatalf("Could not subscribe: %v\n", err)
593 }
594
595 limit := 10
596 toSend := 100
597
598 // Limit internal subchan length to trip condition easier.
599 sub.SetPendingLimits(limit, 1024)
600
601 ch := make(chan bool)
602
603 aeCalled := int64(0)
604
605 nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {
606 atomic.AddInt64(&aeCalled, 1)
607
608 if s != sub {
609 t.Fatal("Did not receive proper subscription")
610 }
611 if !errors.Is(e, nats.ErrSlowConsumer) {
612 t.Fatalf("Did not receive proper error: %v vs %v", e, nats.ErrSlowConsumer)
613 }
614 // Suppress additional calls
615 if atomic.LoadInt64(&aeCalled) == 1 {
616 // release the sub
617 defer close(bch)
618 // release the test
619 ch <- true
620 }
621 })
622
623 b := []byte("Hello World!")
624 // First one trips the ch wait in subscription callback.
625 nc.Publish(subj, b)
626 nc.Flush()
627 for range toSend {
628 nc.Publish(subj, b)
629 }

Callers

nothing calls this directly

Calls 13

ConnectMethod · 0.80
FatalfMethod · 0.80
SetPendingLimitsMethod · 0.80
SetErrorHandlerMethod · 0.80
DroppedMethod · 0.80
UnsubscribeMethod · 0.80
RunDefaultServerFunction · 0.70
WaitFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
IsMethod · 0.45

Tested by

no test coverage detected