(t *testing.T)
| 235 | } |
| 236 | |
| 237 | func TestAutoUnsubWithParallelNextMsgCalls(t *testing.T) { |
| 238 | s := RunDefaultServer() |
| 239 | defer s.Shutdown() |
| 240 | |
| 241 | rch := make(chan bool, 1) |
| 242 | |
| 243 | nc, err := nats.Connect(nats.DefaultURL, |
| 244 | nats.ReconnectWait(50*time.Millisecond), |
| 245 | nats.ReconnectJitter(0, 0), |
| 246 | nats.ReconnectHandler(func(_ *nats.Conn) { rch <- true })) |
| 247 | if err != nil { |
| 248 | t.Fatalf("Unable to connect: %v", err) |
| 249 | } |
| 250 | defer nc.Close() |
| 251 | |
| 252 | numRoutines := 3 |
| 253 | max := 100 |
| 254 | total := max * 2 |
| 255 | received := int64(0) |
| 256 | |
| 257 | var wg sync.WaitGroup |
| 258 | |
| 259 | sub, err := nc.SubscribeSync("foo") |
| 260 | if err != nil { |
| 261 | t.Fatalf("Failed to subscribe: %v", err) |
| 262 | } |
| 263 | sub.AutoUnsubscribe(int(max)) |
| 264 | nc.Flush() |
| 265 | |
| 266 | wg.Add(numRoutines) |
| 267 | |
| 268 | for i := range numRoutines { |
| 269 | go func(s *nats.Subscription, idx int) { |
| 270 | for { |
| 271 | // The first to reach the max delivered will cause the |
| 272 | // subscription to be removed, which will kick out all |
| 273 | // other calls to NextMsg. So don't be afraid of the long |
| 274 | // timeout. |
| 275 | _, err := s.NextMsg(3 * time.Second) |
| 276 | if err != nil { |
| 277 | break |
| 278 | } |
| 279 | atomic.AddInt64(&received, 1) |
| 280 | } |
| 281 | wg.Done() |
| 282 | }(sub, i) |
| 283 | } |
| 284 | |
| 285 | msg := []byte("Hello") |
| 286 | for range max / 2 { |
| 287 | nc.Publish("foo", msg) |
| 288 | } |
| 289 | nc.Flush() |
| 290 | |
| 291 | s.Shutdown() |
| 292 | s = RunDefaultServer() |
| 293 | defer s.Shutdown() |
| 294 |
nothing calls this directly
no test coverage detected