MCPcopy
hub / github.com/nats-io/nats.go / TestSubscriptionEvents

Function TestSubscriptionEvents

test/sub_test.go:1658–1839  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1656}
1657
1658func TestSubscriptionEvents(t *testing.T) {
1659 t.Run("default events", func(t *testing.T) {
1660 s := RunDefaultServer()
1661 defer s.Shutdown()
1662
1663 nc := NewDefaultConnection(t)
1664 // disable slow consumer prints
1665 nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {})
1666 defer nc.Close()
1667
1668 blockChan := make(chan struct{})
1669 sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
1670 // block in subscription callback
1671 // to force slow consumer
1672 <-blockChan
1673 })
1674 if err != nil {
1675 t.Fatalf("Error subscribing: %v", err)
1676 }
1677 sub.SetPendingLimits(10, 1024)
1678 status := sub.StatusChanged()
1679
1680 // initial status
1681 WaitOnChannel(t, status, nats.SubscriptionActive)
1682
1683 for range 11 {
1684 nc.Publish("foo", []byte("Hello"))
1685 }
1686 WaitOnChannel(t, status, nats.SubscriptionSlowConsumer)
1687 close(blockChan)
1688
1689 sub.Drain()
1690
1691 WaitOnChannel(t, status, nats.SubscriptionDraining)
1692
1693 WaitOnChannel(t, status, nats.SubscriptionClosed)
1694 })
1695
1696 t.Run("slow consumer event only", func(t *testing.T) {
1697 s := RunDefaultServer()
1698 defer s.Shutdown()
1699
1700 nc := NewDefaultConnection(t)
1701 defer nc.Close()
1702
1703 blockChan := make(chan struct{})
1704 sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
1705 // block in subscription callback
1706 // to force slow consumer
1707 <-blockChan
1708 })
1709 // disable slow consumer prints
1710 nc.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, e error) {})
1711 defer sub.Unsubscribe()
1712 if err != nil {
1713 t.Fatalf("Error subscribing: %v", err)
1714 }
1715 sub.SetPendingLimits(10, 1024)

Callers

nothing calls this directly

Calls 13

WaitOnChannelFunction · 0.85
SetErrorHandlerMethod · 0.80
FatalfMethod · 0.80
SetPendingLimitsMethod · 0.80
UnsubscribeMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
DrainMethod · 0.65
SubscribeSyncMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected