MCPcopy
hub / github.com/nats-io/nats.go / TestAutoUnsubAndReconnect

Function TestAutoUnsubAndReconnect

test/sub_test.go:179–235  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

177}
178
179func TestAutoUnsubAndReconnect(t *testing.T) {
180 s := RunDefaultServer()
181 defer s.Shutdown()
182
183 rch := make(chan bool)
184
185 nc, err := nats.Connect(nats.DefaultURL,
186 nats.ReconnectWait(50*time.Millisecond),
187 nats.ReconnectJitter(0, 0),
188 nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true }))
189 if err != nil {
190 t.Fatalf("Unable to connect: %v", err)
191 }
192 defer nc.Close()
193
194 received := int32(0)
195 max := int32(10)
196 sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
197 atomic.AddInt32(&received, 1)
198 })
199 if err != nil {
200 t.Fatalf("Failed to subscribe: %v", err)
201 }
202 sub.AutoUnsubscribe(int(max))
203
204 // Send less than the max
205 total := int(max / 2)
206 for range total {
207 nc.Publish("foo", []byte("Hello"))
208 }
209 nc.Flush()
210
211 // Restart the server
212 s.Shutdown()
213 s = RunDefaultServer()
214 defer s.Shutdown()
215
216 // and wait to reconnect
217 if err := Wait(rch); err != nil {
218 t.Fatal("Failed to get the reconnect cb")
219 }
220
221 // Now send more than the total max.
222 total = int(3 * max)
223 for range total {
224 nc.Publish("foo", []byte("Hello"))
225 }
226 nc.Flush()
227
228 // Wait a bit before checking.
229 time.Sleep(50 * time.Millisecond)
230
231 // We should have received only up-to-max messages.
232 if atomic.LoadInt32(&received) != max {
233 t.Fatalf("Received %d msgs, wanted only %d\n", received, max)
234 }
235}
236

Callers

nothing calls this directly

Calls 10

ConnectMethod · 0.80
ReconnectHandlerMethod · 0.80
FatalfMethod · 0.80
AutoUnsubscribeMethod · 0.80
RunDefaultServerFunction · 0.70
WaitFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected