MCPcopy
hub / github.com/nats-io/nats.go / TestDrainQueueSub

Function TestDrainQueueSub

test/drain_test.go:77–126  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

75}
76
77func TestDrainQueueSub(t *testing.T) {
78 s := RunDefaultServer()
79 defer s.Shutdown()
80 nc := NewDefaultConnection(t)
81 defer nc.Close()
82
83 done := make(chan bool)
84 received := int32(0)
85 expected := int32(4096)
86 numSubs := int32(32)
87
88 checkDone := func() int32 {
89 rcvd := atomic.AddInt32(&received, 1)
90 if rcvd >= expected {
91 done <- true
92 }
93 return rcvd
94 }
95
96 callback := func(m *nats.Msg) {
97 rcvd := checkDone()
98 // Randomly replace this sub from time to time.
99 if rcvd%3 == 0 {
100 m.Sub.Drain()
101 // Create a new one that we will not drain.
102 nc.QueueSubscribe("foo", "bar", func(m *nats.Msg) { checkDone() })
103 }
104 }
105
106 for i := int32(0); i < numSubs; i++ {
107 _, err := nc.QueueSubscribe("foo", "bar", callback)
108 if err != nil {
109 t.Fatalf("Error creating subscription; %v", err)
110 }
111 }
112
113 for i := int32(0); i < expected; i++ {
114 nc.Publish("foo", []byte("Don't forget about me"))
115 }
116
117 select {
118 case <-done:
119 break
120 case <-time.After(5 * time.Second):
121 r := atomic.LoadInt32(&received)
122 if r != expected {
123 t.Fatalf("Did not receive all messages: %d of %d", r, expected)
124 }
125 }
126}
127
128func waitFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
129 t.Helper()

Callers

nothing calls this directly

Calls 7

FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
DrainMethod · 0.65
QueueSubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected