MCPcopy
hub / github.com/nats-io/nats.go / TestDrainConnectionAutoUnsub

Function TestDrainConnectionAutoUnsub

test/drain_test.go:342–405  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

340}
341
342func TestDrainConnectionAutoUnsub(t *testing.T) {
343 s := RunDefaultServer()
344 defer s.Shutdown()
345
346 errors := int32(0)
347 received := int32(0)
348 expected := int32(10)
349
350 done := make(chan bool)
351
352 closed := func(nc *nats.Conn) {
353 done <- true
354 }
355
356 errCb := func(nc *nats.Conn, s *nats.Subscription, err error) {
357 atomic.AddInt32(&errors, 1)
358 }
359
360 url := fmt.Sprintf("nats://127.0.0.1:%d", nats.DefaultPort)
361 nc, err := nats.Connect(url, nats.ErrorHandler(errCb), nats.ClosedHandler(closed))
362 if err != nil {
363 t.Fatalf("Failed to create default connection: %v", err)
364 }
365 defer nc.Close()
366
367 sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {
368 // So they back up a bit in client and allow drain to do its thing.
369 time.Sleep(10 * time.Millisecond)
370 atomic.AddInt32(&received, 1)
371
372 })
373 if err != nil {
374 t.Fatalf("Error creating subscription; %v", err)
375 }
376
377 sub.AutoUnsubscribe(int(expected))
378
379 // Publish some messages
380 for i := 0; i < 50; i++ {
381 nc.Publish("foo", []byte("Only 10 please!"))
382 }
383 // Flush here so messages coming back into client.
384 nc.Flush()
385
386 // Now add drain state.
387 time.Sleep(10 * time.Millisecond)
388 nc.Drain()
389
390 // Wait for the closed state from nc
391 select {
392 case <-done:
393 errs := atomic.LoadInt32(&errors)
394 if errs > 0 {
395 t.Fatalf("Did not expect any errors, got %d", errs)
396 }
397 r := atomic.LoadInt32(&received)
398 if r != expected {
399 t.Fatalf("Did not receive all messages from Drain, %d vs %d", r, expected)

Callers

nothing calls this directly

Calls 11

ConnectMethod · 0.80
ErrorHandlerMethod · 0.80
ClosedHandlerMethod · 0.80
FatalfMethod · 0.80
AutoUnsubscribeMethod · 0.80
RunDefaultServerFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
DrainMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected