(t *testing.T)
| 75 | } |
| 76 | |
| 77 | func TestDrainQueueSub(t *testing.T) { |
| 78 | s := RunDefaultServer() |
| 79 | defer s.Shutdown() |
| 80 | nc := NewDefaultConnection(t) |
| 81 | defer nc.Close() |
| 82 | |
| 83 | done := make(chan bool) |
| 84 | received := int32(0) |
| 85 | expected := int32(4096) |
| 86 | numSubs := int32(32) |
| 87 | |
| 88 | checkDone := func() int32 { |
| 89 | rcvd := atomic.AddInt32(&received, 1) |
| 90 | if rcvd >= expected { |
| 91 | done <- true |
| 92 | } |
| 93 | return rcvd |
| 94 | } |
| 95 | |
| 96 | callback := func(m *nats.Msg) { |
| 97 | rcvd := checkDone() |
| 98 | // Randomly replace this sub from time to time. |
| 99 | if rcvd%3 == 0 { |
| 100 | m.Sub.Drain() |
| 101 | // Create a new one that we will not drain. |
| 102 | nc.QueueSubscribe("foo", "bar", func(m *nats.Msg) { checkDone() }) |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | for i := int32(0); i < numSubs; i++ { |
| 107 | _, err := nc.QueueSubscribe("foo", "bar", callback) |
| 108 | if err != nil { |
| 109 | t.Fatalf("Error creating subscription; %v", err) |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | for i := int32(0); i < expected; i++ { |
| 114 | nc.Publish("foo", []byte("Don't forget about me")) |
| 115 | } |
| 116 | |
| 117 | select { |
| 118 | case <-done: |
| 119 | break |
| 120 | case <-time.After(5 * time.Second): |
| 121 | r := atomic.LoadInt32(&received) |
| 122 | if r != expected { |
| 123 | t.Fatalf("Did not receive all messages: %d of %d", r, expected) |
| 124 | } |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | func waitFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) { |
| 129 | t.Helper() |
nothing calls this directly
no test coverage detected