(t *testing.T)
| 177 | } |
| 178 | |
| 179 | func TestAutoUnsubAndReconnect(t *testing.T) { |
| 180 | s := RunDefaultServer() |
| 181 | defer s.Shutdown() |
| 182 | |
| 183 | rch := make(chan bool) |
| 184 | |
| 185 | nc, err := nats.Connect(nats.DefaultURL, |
| 186 | nats.ReconnectWait(50*time.Millisecond), |
| 187 | nats.ReconnectJitter(0, 0), |
| 188 | nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true })) |
| 189 | if err != nil { |
| 190 | t.Fatalf("Unable to connect: %v", err) |
| 191 | } |
| 192 | defer nc.Close() |
| 193 | |
| 194 | received := int32(0) |
| 195 | max := int32(10) |
| 196 | sub, err := nc.Subscribe("foo", func(_ *nats.Msg) { |
| 197 | atomic.AddInt32(&received, 1) |
| 198 | }) |
| 199 | if err != nil { |
| 200 | t.Fatalf("Failed to subscribe: %v", err) |
| 201 | } |
| 202 | sub.AutoUnsubscribe(int(max)) |
| 203 | |
| 204 | // Send less than the max |
| 205 | total := int(max / 2) |
| 206 | for range total { |
| 207 | nc.Publish("foo", []byte("Hello")) |
| 208 | } |
| 209 | nc.Flush() |
| 210 | |
| 211 | // Restart the server |
| 212 | s.Shutdown() |
| 213 | s = RunDefaultServer() |
| 214 | defer s.Shutdown() |
| 215 | |
| 216 | // and wait to reconnect |
| 217 | if err := Wait(rch); err != nil { |
| 218 | t.Fatal("Failed to get the reconnect cb") |
| 219 | } |
| 220 | |
| 221 | // Now send more than the total max. |
| 222 | total = int(3 * max) |
| 223 | for range total { |
| 224 | nc.Publish("foo", []byte("Hello")) |
| 225 | } |
| 226 | nc.Flush() |
| 227 | |
| 228 | // Wait a bit before checking. |
| 229 | time.Sleep(50 * time.Millisecond) |
| 230 | |
| 231 | // We should have received only up-to-max messages. |
| 232 | if atomic.LoadInt32(&received) != max { |
| 233 | t.Fatalf("Received %d msgs, wanted only %d\n", received, max) |
| 234 | } |
| 235 | } |
| 236 |
nothing calls this directly
no test coverage detected