(t *testing.T)
| 340 | } |
| 341 | |
| 342 | func TestDrainConnectionAutoUnsub(t *testing.T) { |
| 343 | s := RunDefaultServer() |
| 344 | defer s.Shutdown() |
| 345 | |
| 346 | errors := int32(0) |
| 347 | received := int32(0) |
| 348 | expected := int32(10) |
| 349 | |
| 350 | done := make(chan bool) |
| 351 | |
| 352 | closed := func(nc *nats.Conn) { |
| 353 | done <- true |
| 354 | } |
| 355 | |
| 356 | errCb := func(nc *nats.Conn, s *nats.Subscription, err error) { |
| 357 | atomic.AddInt32(&errors, 1) |
| 358 | } |
| 359 | |
| 360 | url := fmt.Sprintf("nats://127.0.0.1:%d", nats.DefaultPort) |
| 361 | nc, err := nats.Connect(url, nats.ErrorHandler(errCb), nats.ClosedHandler(closed)) |
| 362 | if err != nil { |
| 363 | t.Fatalf("Failed to create default connection: %v", err) |
| 364 | } |
| 365 | defer nc.Close() |
| 366 | |
| 367 | sub, err := nc.Subscribe("foo", func(_ *nats.Msg) { |
| 368 | // So they back up a bit in client and allow drain to do its thing. |
| 369 | time.Sleep(10 * time.Millisecond) |
| 370 | atomic.AddInt32(&received, 1) |
| 371 | |
| 372 | }) |
| 373 | if err != nil { |
| 374 | t.Fatalf("Error creating subscription; %v", err) |
| 375 | } |
| 376 | |
| 377 | sub.AutoUnsubscribe(int(expected)) |
| 378 | |
| 379 | // Publish some messages |
| 380 | for i := 0; i < 50; i++ { |
| 381 | nc.Publish("foo", []byte("Only 10 please!")) |
| 382 | } |
| 383 | // Flush here so messages coming back into client. |
| 384 | nc.Flush() |
| 385 | |
| 386 | // Now add drain state. |
| 387 | time.Sleep(10 * time.Millisecond) |
| 388 | nc.Drain() |
| 389 | |
| 390 | // Wait for the closed state from nc |
| 391 | select { |
| 392 | case <-done: |
| 393 | errs := atomic.LoadInt32(&errors) |
| 394 | if errs > 0 { |
| 395 | t.Fatalf("Did not expect any errors, got %d", errs) |
| 396 | } |
| 397 | r := atomic.LoadInt32(&received) |
| 398 | if r != expected { |
| 399 | t.Fatalf("Did not receive all messages from Drain, %d vs %d", r, expected) |
nothing calls this directly
no test coverage detected