MCPcopy
hub / github.com/nats-io/nats.go / TestChanSubscriberPendingLimits

Function TestChanSubscriberPendingLimits

test/sub_test.go:934–1013  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

932}
933
934func TestChanSubscriberPendingLimits(t *testing.T) {
935 s := RunDefaultServer()
936 defer s.Shutdown()
937
938 nc := NewDefaultConnection(t)
939 defer nc.Close()
940 ncp := NewDefaultConnection(t)
941 defer ncp.Close()
942
943 // There was a defect that prevented to receive more than
944 // the default pending message limit. Trying to send more
945 // than this limit.
946 pending := 1000
947 total := pending + 100
948
949 for typeSubs := 0; typeSubs < 3; typeSubs++ {
950
951 func() {
952 // Create our own channel.
953 ch := make(chan *nats.Msg, total)
954
955 var err error
956 var sub *nats.Subscription
957 switch typeSubs {
958 case 0:
959 sub, err = nc.ChanSubscribe("foo", ch)
960 if err := sub.SetPendingLimits(pending, -1); err == nil {
961 t.Fatalf("Expected an error setting pending limits")
962 }
963 case 1:
964 sub, err = nc.ChanQueueSubscribe("foo", "bar", ch)
965 if err := sub.SetPendingLimits(pending, -1); err == nil {
966 t.Fatalf("Expected an error setting pending limits")
967 }
968 case 2:
969 sub, err = nc.QueueSubscribeSyncWithChan("foo", "bar", ch)
970 if err := sub.SetPendingLimits(pending, -1); err == nil {
971 t.Fatalf("Expected an error setting pending limits")
972 }
973 }
974 if err != nil {
975 t.Fatalf("Unexpected error on subscribe: %v", err)
976 }
977 defer sub.Unsubscribe()
978 nc.Flush()
979
980 // Send some messages
981 for range total {
982 if err := ncp.Publish("foo", []byte("Hello")); err != nil {
983 t.Fatalf("Unexpected error on publish: %v", err)
984 }
985 }
986
987 received := 0
988 tm := time.NewTimer(10 * time.Second)
989 defer tm.Stop()
990
991 chk := func(ok bool) {

Callers

nothing calls this directly

Calls 12

SetPendingLimitsMethod · 0.95
UnsubscribeMethod · 0.95
FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
ChanSubscribeMethod · 0.65
ChanQueueSubscribeMethod · 0.65
PublishMethod · 0.65
StopMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected