(t *testing.T)
| 1013 | } |
| 1014 | |
| 1015 | func TestQueueChanQueueSubscriber(t *testing.T) { |
| 1016 | s := RunDefaultServer() |
| 1017 | defer s.Shutdown() |
| 1018 | |
| 1019 | nc := NewDefaultConnection(t) |
| 1020 | defer nc.Close() |
| 1021 | |
| 1022 | // Create our own channel. |
| 1023 | ch1 := make(chan *nats.Msg, 64) |
| 1024 | ch2 := make(chan *nats.Msg, 64) |
| 1025 | |
| 1026 | nc.QueueSubscribeSyncWithChan("foo", "bar", ch1) |
| 1027 | nc.QueueSubscribeSyncWithChan("foo", "bar", ch2) |
| 1028 | |
| 1029 | // Send some messages to ourselves. |
| 1030 | total := 100 |
| 1031 | for range total { |
| 1032 | nc.Publish("foo", []byte("Hello")) |
| 1033 | } |
| 1034 | |
| 1035 | recv1 := 0 |
| 1036 | recv2 := 0 |
| 1037 | tm := time.NewTimer(5 * time.Second) |
| 1038 | defer tm.Stop() |
| 1039 | runTimer := time.NewTimer(500 * time.Millisecond) |
| 1040 | defer runTimer.Stop() |
| 1041 | |
| 1042 | chk := func(ok bool, which int) { |
| 1043 | if !ok { |
| 1044 | t.Fatalf("Got an error reading from channel") |
| 1045 | } else { |
| 1046 | if which == 1 { |
| 1047 | recv1++ |
| 1048 | } else { |
| 1049 | recv2++ |
| 1050 | } |
| 1051 | } |
| 1052 | } |
| 1053 | |
| 1054 | // Go ahead and receive |
| 1055 | recvLoop: |
| 1056 | for { |
| 1057 | select { |
| 1058 | case _, ok := <-ch1: |
| 1059 | chk(ok, 1) |
| 1060 | case _, ok := <-ch2: |
| 1061 | chk(ok, 2) |
| 1062 | case <-tm.C: |
| 1063 | t.Fatalf("Timed out waiting on messages") |
| 1064 | case <-runTimer.C: |
| 1065 | break recvLoop |
| 1066 | } |
| 1067 | } |
| 1068 | |
| 1069 | if recv1+recv2 > total { |
| 1070 | t.Fatalf("Received more messages than expected: %v vs %v", (recv1 + recv2), total) |
| 1071 | } |
| 1072 | } |
nothing calls this directly
no test coverage detected