(t *testing.T)
| 884 | } |
| 885 | |
| 886 | func TestChanQueueSubscriber(t *testing.T) { |
| 887 | s := RunDefaultServer() |
| 888 | defer s.Shutdown() |
| 889 | |
| 890 | nc := NewDefaultConnection(t) |
| 891 | defer nc.Close() |
| 892 | |
| 893 | // Create our own channel. |
| 894 | ch1 := make(chan *nats.Msg, 64) |
| 895 | ch2 := make(chan *nats.Msg, 64) |
| 896 | |
| 897 | nc.ChanQueueSubscribe("foo", "bar", ch1) |
| 898 | nc.ChanQueueSubscribe("foo", "bar", ch2) |
| 899 | |
| 900 | // Send some messages to ourselves. |
| 901 | total := 100 |
| 902 | for range total { |
| 903 | nc.Publish("foo", []byte("Hello")) |
| 904 | } |
| 905 | |
| 906 | received := 0 |
| 907 | tm := time.NewTimer(5 * time.Second) |
| 908 | defer tm.Stop() |
| 909 | |
| 910 | chk := func(ok bool) { |
| 911 | if !ok { |
| 912 | t.Fatalf("Got an error reading from channel") |
| 913 | } else { |
| 914 | received++ |
| 915 | } |
| 916 | } |
| 917 | |
| 918 | // Go ahead and receive |
| 919 | for { |
| 920 | select { |
| 921 | case _, ok := <-ch1: |
| 922 | chk(ok) |
| 923 | case _, ok := <-ch2: |
| 924 | chk(ok) |
| 925 | case <-tm.C: |
| 926 | t.Fatalf("Timed out waiting on messages") |
| 927 | } |
| 928 | if received >= total { |
| 929 | return |
| 930 | } |
| 931 | } |
| 932 | } |
| 933 | |
| 934 | func TestChanSubscriberPendingLimits(t *testing.T) { |
| 935 | s := RunDefaultServer() |
nothing calls this directly
no test coverage detected