MCPcopy
hub / github.com/nats-io/nats.go / TestAutoUnsubscribeFromCallback

Function TestAutoUnsubscribeFromCallback

test/sub_test.go:311–396  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

309}
310
311func TestAutoUnsubscribeFromCallback(t *testing.T) {
312 s := RunDefaultServer()
313 defer s.Shutdown()
314
315 nc, err := nats.Connect(nats.DefaultURL)
316 if err != nil {
317 t.Fatalf("Unable to connect: %v", err)
318 }
319 defer nc.Close()
320
321 max := 10
322 resetUnsubMark := int64(max / 2)
323 limit := int64(100)
324 received := int64(0)
325
326 msg := []byte("Hello")
327
328 // Auto-unsubscribe within the callback with a value lower
329 // than what was already received.
330
331 sub, err := nc.Subscribe("foo", func(m *nats.Msg) {
332 r := atomic.AddInt64(&received, 1)
333 if r == resetUnsubMark {
334 m.Sub.AutoUnsubscribe(int(r - 1))
335 nc.Flush()
336 }
337 if r == limit {
338 // Something went wrong... fail now
339 t.Fatal("Got more messages than expected")
340 }
341 nc.Publish("foo", msg)
342 })
343 if err != nil {
344 t.Fatalf("Failed to subscribe: %v", err)
345 }
346 sub.AutoUnsubscribe(int(max))
347 nc.Flush()
348
349 // Trigger the first message, the other are sent from the callback.
350 nc.Publish("foo", msg)
351 nc.Flush()
352
353 waitFor(t, time.Second, 100*time.Millisecond, func() error {
354 recv := atomic.LoadInt64(&received)
355 if recv != resetUnsubMark {
356 return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v",
357 max, resetUnsubMark, recv)
358 }
359 return nil
360 })
361
362 // Now check with AutoUnsubscribe with higher value than original
363 received = int64(0)
364 newMax := int64(2 * max)
365
366 sub, err = nc.Subscribe("foo", func(m *nats.Msg) {
367 r := atomic.AddInt64(&received, 1)
368 if r == resetUnsubMark {

Callers

nothing calls this directly

Calls 10

waitForFunction · 0.85
ConnectMethod · 0.80
FatalfMethod · 0.80
AutoUnsubscribeMethod · 0.80
ErrorfMethod · 0.80
RunDefaultServerFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected