| 496 | } |
| 497 | |
| 498 | func TestFullFlushChanDuringReconnect(t *testing.T) { |
| 499 | ts := startReconnectServer(t) |
| 500 | defer ts.Shutdown() |
| 501 | |
| 502 | reconnectch := make(chan bool, 2) |
| 503 | |
| 504 | opts := nats.GetDefaultOptions() |
| 505 | opts.Url = fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT) |
| 506 | opts.AllowReconnect = true |
| 507 | opts.MaxReconnect = 10000 |
| 508 | opts.ReconnectWait = 100 * time.Millisecond |
| 509 | nats.ReconnectJitter(0, 0)(&opts) |
| 510 | |
| 511 | opts.ReconnectedCB = func(_ *nats.Conn) { |
| 512 | reconnectch <- true |
| 513 | } |
| 514 | |
| 515 | // Connect |
| 516 | nc, err := opts.Connect() |
| 517 | if err != nil { |
| 518 | t.Fatalf("Should have connected ok: %v", err) |
| 519 | } |
| 520 | defer nc.Close() |
| 521 | |
| 522 | // Channel used to make the go routine sending messages to stop. |
| 523 | stop := make(chan bool) |
| 524 | |
| 525 | // While connected, publish as fast as we can |
| 526 | go func() { |
| 527 | for i := 0; ; i++ { |
| 528 | _ = nc.Publish("foo", []byte("hello")) |
| 529 | |
| 530 | // Make sure we are sending at least flushChanSize (1024) messages |
| 531 | // before potentially pausing. |
| 532 | if i%2000 == 0 { |
| 533 | select { |
| 534 | case <-stop: |
| 535 | return |
| 536 | default: |
| 537 | time.Sleep(100 * time.Millisecond) |
| 538 | } |
| 539 | } |
| 540 | } |
| 541 | }() |
| 542 | |
| 543 | // Send a bit... |
| 544 | time.Sleep(500 * time.Millisecond) |
| 545 | |
| 546 | // Shut down the server |
| 547 | ts.Shutdown() |
| 548 | |
| 549 | // Continue sending while we are disconnected |
| 550 | time.Sleep(time.Second) |
| 551 | |
| 552 | // Restart the server |
| 553 | ts = startReconnectServer(t) |
| 554 | defer ts.Shutdown() |
| 555 | |