MCPcopy
hub / github.com/nats-io/nats.go / TestChanQueueSubscriber

Function TestChanQueueSubscriber

test/sub_test.go:886–932  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

884}
885
886func TestChanQueueSubscriber(t *testing.T) {
887 s := RunDefaultServer()
888 defer s.Shutdown()
889
890 nc := NewDefaultConnection(t)
891 defer nc.Close()
892
893 // Create our own channel.
894 ch1 := make(chan *nats.Msg, 64)
895 ch2 := make(chan *nats.Msg, 64)
896
897 nc.ChanQueueSubscribe("foo", "bar", ch1)
898 nc.ChanQueueSubscribe("foo", "bar", ch2)
899
900 // Send some messages to ourselves.
901 total := 100
902 for range total {
903 nc.Publish("foo", []byte("Hello"))
904 }
905
906 received := 0
907 tm := time.NewTimer(5 * time.Second)
908 defer tm.Stop()
909
910 chk := func(ok bool) {
911 if !ok {
912 t.Fatalf("Got an error reading from channel")
913 } else {
914 received++
915 }
916 }
917
918 // Go ahead and receive
919 for {
920 select {
921 case _, ok := <-ch1:
922 chk(ok)
923 case _, ok := <-ch2:
924 chk(ok)
925 case <-tm.C:
926 t.Fatalf("Timed out waiting on messages")
927 }
928 if received >= total {
929 return
930 }
931 }
932}
933
934func TestChanSubscriberPendingLimits(t *testing.T) {
935 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 7

FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
ChanQueueSubscribeMethod · 0.65
PublishMethod · 0.65
StopMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected