(t *testing.T)
| 649 | } |
| 650 | |
| 651 | func TestAsyncErrHandlerChanSubscription(t *testing.T) { |
| 652 | s := RunDefaultServer() |
| 653 | defer s.Shutdown() |
| 654 | |
| 655 | opts := nats.GetDefaultOptions() |
| 656 | |
| 657 | nc, err := opts.Connect() |
| 658 | if err != nil { |
| 659 | t.Fatalf("Could not connect to server: %v", err) |
| 660 | } |
| 661 | defer nc.Close() |
| 662 | |
| 663 | subj := "chan_test" |
| 664 | |
| 665 | limit := 10 |
| 666 | toSend := 100 |
| 667 | |
| 668 | // Create our own channel. |
| 669 | mch := make(chan *nats.Msg, limit) |
| 670 | sub, err := nc.ChanSubscribe(subj, mch) |
| 671 | if err != nil { |
| 672 | t.Fatalf("Could not subscribe: %v", err) |
| 673 | } |
| 674 | ch := make(chan bool) |
| 675 | aeCalled := int64(0) |
| 676 | |
| 677 | nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) { |
| 678 | atomic.AddInt64(&aeCalled, 1) |
| 679 | if !errors.Is(e, nats.ErrSlowConsumer) { |
| 680 | t.Fatalf("Did not receive proper error: %v vs %v", |
| 681 | e, nats.ErrSlowConsumer) |
| 682 | } |
| 683 | // Suppress additional calls |
| 684 | if atomic.LoadInt64(&aeCalled) == 1 { |
| 685 | // release the test |
| 686 | ch <- true |
| 687 | } |
| 688 | }) |
| 689 | |
| 690 | b := []byte("Hello World!") |
| 691 | for range toSend { |
| 692 | nc.Publish(subj, b) |
| 693 | } |
| 694 | nc.Flush() |
| 695 | |
| 696 | if e := Wait(ch); e != nil { |
| 697 | t.Fatal("Failed to call async err handler") |
| 698 | } |
| 699 | // Make sure dropped stats is correct. |
| 700 | if d, _ := sub.Dropped(); d != toSend-limit { |
| 701 | t.Fatalf("Expected Dropped to be %d, go %d", toSend-limit, d) |
| 702 | } |
| 703 | if ae := atomic.LoadInt64(&aeCalled); ae != 1 { |
| 704 | t.Fatalf("Expected err handler to be called once, got %d", ae) |
| 705 | } |
| 706 | |
| 707 | sub.Unsubscribe() |
| 708 | if _, err := sub.Dropped(); err == nil { |
nothing calls this directly
no test coverage detected