MCPcopy
hub / github.com/nats-io/nats.go / TestQueueChanQueueSubscriber

Function TestQueueChanQueueSubscriber

test/sub_test.go:1015–1072  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1013}
1014
1015func TestQueueChanQueueSubscriber(t *testing.T) {
1016 s := RunDefaultServer()
1017 defer s.Shutdown()
1018
1019 nc := NewDefaultConnection(t)
1020 defer nc.Close()
1021
1022 // Create our own channel.
1023 ch1 := make(chan *nats.Msg, 64)
1024 ch2 := make(chan *nats.Msg, 64)
1025
1026 nc.QueueSubscribeSyncWithChan("foo", "bar", ch1)
1027 nc.QueueSubscribeSyncWithChan("foo", "bar", ch2)
1028
1029 // Send some messages to ourselves.
1030 total := 100
1031 for range total {
1032 nc.Publish("foo", []byte("Hello"))
1033 }
1034
1035 recv1 := 0
1036 recv2 := 0
1037 tm := time.NewTimer(5 * time.Second)
1038 defer tm.Stop()
1039 runTimer := time.NewTimer(500 * time.Millisecond)
1040 defer runTimer.Stop()
1041
1042 chk := func(ok bool, which int) {
1043 if !ok {
1044 t.Fatalf("Got an error reading from channel")
1045 } else {
1046 if which == 1 {
1047 recv1++
1048 } else {
1049 recv2++
1050 }
1051 }
1052 }
1053
1054 // Go ahead and receive
1055recvLoop:
1056 for {
1057 select {
1058 case _, ok := <-ch1:
1059 chk(ok, 1)
1060 case _, ok := <-ch2:
1061 chk(ok, 2)
1062 case <-tm.C:
1063 t.Fatalf("Timed out waiting on messages")
1064 case <-runTimer.C:
1065 break recvLoop
1066 }
1067 }
1068
1069 if recv1+recv2 > total {
1070 t.Fatalf("Received more messages than expected: %v vs %v", (recv1 + recv2), total)
1071 }
1072}

Callers

nothing calls this directly

Calls 7

FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
PublishMethod · 0.65
StopMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected