MCPcopy
hub / github.com/nats-io/nats.go / TestAutoUnsubWithParallelNextMsgCalls

Function TestAutoUnsubWithParallelNextMsgCalls

test/sub_test.go:237–309  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

235}
236
237func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) {
238 s := RunDefaultServer()
239 defer s.Shutdown()
240
241 rch := make(chan bool, 1)
242
243 nc, err := nats.Connect(nats.DefaultURL,
244 nats.ReconnectWait(50*time.Millisecond),
245 nats.ReconnectJitter(0, 0),
246 nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true }))
247 if err != nil {
248 t.Fatalf("Unable to connect: %v", err)
249 }
250 defer nc.Close()
251
252 numRoutines := 3
253 max := 100
254 total := max * 2
255 received := int64(0)
256
257 var wg sync.WaitGroup
258
259 sub, err := nc.SubscribeSync("foo")
260 if err != nil {
261 t.Fatalf("Failed to subscribe: %v", err)
262 }
263 sub.AutoUnsubscribe(int(max))
264 nc.Flush()
265
266 wg.Add(numRoutines)
267
268 for i := range numRoutines {
269 go func(s *nats.Subscription, idx int) {
270 for {
271 // The first to reach the max delivered will cause the
272 // subscription to be removed, which will kick out all
273 // other calls to NextMsg. So don't be afraid of the long
274 // timeout.
275 _, err := s.NextMsg(3 * time.Second)
276 if err != nil {
277 break
278 }
279 atomic.AddInt64(&received, 1)
280 }
281 wg.Done()
282 }(sub, i)
283 }
284
285 msg := []byte("Hello")
286 for range max / 2 {
287 nc.Publish("foo", msg)
288 }
289 nc.Flush()
290
291 s.Shutdown()
292 s = RunDefaultServer()
293 defer s.Shutdown()
294

Callers

nothing calls this directly

Calls 13

ConnectMethod · 0.80
ReconnectHandlerMethod · 0.80
FatalfMethod · 0.80
AutoUnsubscribeMethod · 0.80
NextMsgMethod · 0.80
RunDefaultServerFunction · 0.70
WaitFunction · 0.70
SubscribeSyncMethod · 0.65
AddMethod · 0.65
DoneMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected