(t *testing.T)
| 570 | } |
| 571 | |
| 572 | func TestAsyncErrHandler(t *testing.T) { |
| 573 | s := RunDefaultServer() |
| 574 | defer s.Shutdown() |
| 575 | |
| 576 | opts := nats.GetDefaultOptions() |
| 577 | |
| 578 | nc, err := opts.Connect() |
| 579 | if err != nil { |
| 580 | t.Fatalf("Could not connect to server: %v\n", err) |
| 581 | } |
| 582 | defer nc.Close() |
| 583 | |
| 584 | subj := "async_test" |
| 585 | bch := make(chan bool) |
| 586 | |
| 587 | sub, err := nc.Subscribe(subj, func(_ *nats.Msg) { |
| 588 | // block to back us up.. |
| 589 | <-bch |
| 590 | }) |
| 591 | if err != nil { |
| 592 | t.Fatalf("Could not subscribe: %v\n", err) |
| 593 | } |
| 594 | |
| 595 | limit := 10 |
| 596 | toSend := 100 |
| 597 | |
| 598 | // Limit internal subchan length to trip condition easier. |
| 599 | sub.SetPendingLimits(limit, 1024) |
| 600 | |
| 601 | ch := make(chan bool) |
| 602 | |
| 603 | aeCalled := int64(0) |
| 604 | |
| 605 | nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) { |
| 606 | atomic.AddInt64(&aeCalled, 1) |
| 607 | |
| 608 | if s != sub { |
| 609 | t.Fatal("Did not receive proper subscription") |
| 610 | } |
| 611 | if !errors.Is(e, nats.ErrSlowConsumer) { |
| 612 | t.Fatalf("Did not receive proper error: %v vs %v", e, nats.ErrSlowConsumer) |
| 613 | } |
| 614 | // Suppress additional calls |
| 615 | if atomic.LoadInt64(&aeCalled) == 1 { |
| 616 | // release the sub |
| 617 | defer close(bch) |
| 618 | // release the test |
| 619 | ch <- true |
| 620 | } |
| 621 | }) |
| 622 | |
| 623 | b := []byte("Hello World!") |
| 624 | // First one trips the ch wait in subscription callback. |
| 625 | nc.Publish(subj, b) |
| 626 | nc.Flush() |
| 627 | for range toSend { |
| 628 | nc.Publish(subj, b) |
| 629 | } |
nothing calls this directly
no test coverage detected