(t *testing.T)
| 405 | } |
| 406 | |
| 407 | func TestDrainConnLastError(t *testing.T) { |
| 408 | s := RunDefaultServer() |
| 409 | defer s.Shutdown() |
| 410 | |
| 411 | done := make(chan bool, 1) |
| 412 | closedCb := func(nc *nats.Conn) { |
| 413 | done <- true |
| 414 | } |
| 415 | |
| 416 | nc, err := nats.Connect(nats.DefaultURL, |
| 417 | nats.ClosedHandler(closedCb), |
| 418 | nats.DrainTimeout(time.Millisecond)) |
| 419 | if err != nil { |
| 420 | t.Fatalf("Failed to create default connection: %v", err) |
| 421 | } |
| 422 | defer nc.Close() |
| 423 | |
| 424 | // Override default handler for test. |
| 425 | nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {}) |
| 426 | |
| 427 | wg := sync.WaitGroup{} |
| 428 | wg.Add(1) |
| 429 | if _, err := nc.Subscribe("foo", func(_ *nats.Msg) { |
| 430 | // So they back up a bit in client to make drain timeout |
| 431 | time.Sleep(100 * time.Millisecond) |
| 432 | wg.Done() |
| 433 | |
| 434 | }); err != nil { |
| 435 | t.Fatalf("Error creating subscription; %v", err) |
| 436 | } |
| 437 | |
| 438 | if err := nc.Publish("foo", []byte("msg")); err != nil { |
| 439 | t.Fatalf("Error on publish: %v", err) |
| 440 | } |
| 441 | if err := nc.Drain(); err != nil { |
| 442 | t.Fatalf("Error on drain: %v", err) |
| 443 | } |
| 444 | |
| 445 | select { |
| 446 | case <-done: |
| 447 | if e := nc.LastError(); e == nil || e != nats.ErrDrainTimeout { |
| 448 | t.Fatalf("Expected last error to be set to %v, got %v", nats.ErrDrainTimeout, e) |
| 449 | } |
| 450 | case <-time.After(2 * time.Second): |
| 451 | t.Fatalf("Timeout waiting for closed state for connection") |
| 452 | } |
| 453 | |
| 454 | // Wait for subscription callback to return |
| 455 | wg.Wait() |
| 456 | } |
| 457 | |
| 458 | func TestDrainConnDuringReconnect(t *testing.T) { |
| 459 | s := RunDefaultServer() |
nothing calls this directly
no test coverage detected