MCPcopy
hub / github.com/nats-io/nats.go / TestDrain

Function TestDrain

test/drain_test.go:29–75  ·  view source on GitHub ↗

Drain can be very useful for graceful shutdown of subscribers. Especially queue subscribers.

(t *testing.T)

Source from the content-addressed store, hash-verified

27// Drain can be very useful for graceful shutdown of subscribers.
28// Especially queue subscribers.
29func TestDrain(t *testing.T) {
30 s := RunDefaultServer()
31 defer s.Shutdown()
32 nc := NewDefaultConnection(t)
33 defer nc.Close()
34
35 done := make(chan bool)
36 received := int32(0)
37 expected := int32(100)
38
39 cb := func(_ *nats.Msg) {
40 // Allow this to back up.
41 time.Sleep(time.Millisecond)
42 rcvd := atomic.AddInt32(&received, 1)
43 if rcvd >= expected {
44 done <- true
45 }
46 }
47
48 sub, err := nc.Subscribe("foo", cb)
49 if err != nil {
50 t.Fatalf("Error creating subscription; %v", err)
51 }
52
53 for i := int32(0); i < expected; i++ {
54 nc.Publish("foo", []byte("Don't forget about me"))
55 }
56
57 // Drain it and make sure we receive all messages.
58 sub.Drain()
59 if !sub.IsDraining() {
60 t.Fatalf("Expected to be draining")
61 }
62 select {
63 case <-done:
64 break
65 case <-time.After(5 * time.Second):
66 r := atomic.LoadInt32(&received)
67 if r != expected {
68 t.Fatalf("Did not receive all messages: %d of %d", r, expected)
69 }
70 }
71 time.Sleep(100 * time.Millisecond)
72 if sub.IsDraining() {
73 t.Fatalf("Expected to be done draining")
74 }
75}
76
77func TestDrainQueueSub(t *testing.T) {
78 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 8

FatalfMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
DrainMethod · 0.65
CloseMethod · 0.45
IsDrainingMethod · 0.45

Tested by

no test coverage detected