MCPcopy
hub / github.com/nats-io/nats.go / TestChanSubscriber

Function TestChanSubscriber

test/sub_test.go:839–884  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

837}
838
839func TestChanSubscriber(t *testing.T) {
840 s := RunDefaultServer()
841 defer s.Shutdown()
842
843 nc := NewDefaultConnection(t)
844 defer nc.Close()
845
846 // Create our own channel.
847 ch := make(chan *nats.Msg, 128)
848
849 // Channel is mandatory
850 if _, err := nc.ChanSubscribe("foo", nil); err == nil {
851 t.Fatal("Creating subscription without channel should have failed")
852 }
853
854 _, err := nc.ChanSubscribe("foo", ch)
855 if err != nil {
856 t.Fatal("Failed to subscribe: ", err)
857 }
858
859 // Send some messages to ourselves.
860 total := 100
861 for range total {
862 nc.Publish("foo", []byte("Hello"))
863 }
864
865 received := 0
866 tm := time.NewTimer(5 * time.Second)
867 defer tm.Stop()
868
869 // Go ahead and receive
870 for {
871 select {
872 case _, ok := <-ch:
873 if !ok {
874 t.Fatalf("Got an error reading from channel")
875 }
876 case <-tm.C:
877 t.Fatalf("Timed out waiting on messages")
878 }
879 received++
880 if received >= total {
881 return
882 }
883 }
884}
885
886func TestChanQueueSubscriber(t *testing.T) {
887 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 7

FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
ChanSubscribeMethod · 0.65
PublishMethod · 0.65
StopMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected