(t *testing.T)
| 309 | } |
| 310 | |
| 311 | func TestAutoUnsubscribeFromCallback(t *testing.T) { |
| 312 | s := RunDefaultServer() |
| 313 | defer s.Shutdown() |
| 314 | |
| 315 | nc, err := nats.Connect(nats.DefaultURL) |
| 316 | if err != nil { |
| 317 | t.Fatalf("Unable to connect: %v", err) |
| 318 | } |
| 319 | defer nc.Close() |
| 320 | |
| 321 | max := 10 |
| 322 | resetUnsubMark := int64(max / 2) |
| 323 | limit := int64(100) |
| 324 | received := int64(0) |
| 325 | |
| 326 | msg := []byte("Hello") |
| 327 | |
| 328 | // Auto-unsubscribe within the callback with a value lower |
| 329 | // than what was already received. |
| 330 | |
| 331 | sub, err := nc.Subscribe("foo", func(m *nats.Msg) { |
| 332 | r := atomic.AddInt64(&received, 1) |
| 333 | if r == resetUnsubMark { |
| 334 | m.Sub.AutoUnsubscribe(int(r - 1)) |
| 335 | nc.Flush() |
| 336 | } |
| 337 | if r == limit { |
| 338 | // Something went wrong... fail now |
| 339 | t.Fatal("Got more messages than expected") |
| 340 | } |
| 341 | nc.Publish("foo", msg) |
| 342 | }) |
| 343 | if err != nil { |
| 344 | t.Fatalf("Failed to subscribe: %v", err) |
| 345 | } |
| 346 | sub.AutoUnsubscribe(int(max)) |
| 347 | nc.Flush() |
| 348 | |
| 349 | // Trigger the first message, the other are sent from the callback. |
| 350 | nc.Publish("foo", msg) |
| 351 | nc.Flush() |
| 352 | |
| 353 | waitFor(t, time.Second, 100*time.Millisecond, func() error { |
| 354 | recv := atomic.LoadInt64(&received) |
| 355 | if recv != resetUnsubMark { |
| 356 | return fmt.Errorf("Wrong number of received messages. Original max was %v reset to %v, actual received: %v", |
| 357 | max, resetUnsubMark, recv) |
| 358 | } |
| 359 | return nil |
| 360 | }) |
| 361 | |
| 362 | // Now check with AutoUnsubscribe with higher value than original |
| 363 | received = int64(0) |
| 364 | newMax := int64(2 * max) |
| 365 | |
| 366 | sub, err = nc.Subscribe("foo", func(m *nats.Msg) { |
| 367 | r := atomic.AddInt64(&received, 1) |
| 368 | if r == resetUnsubMark { |
nothing calls this directly
no test coverage detected