(b *testing.B)
| 683 | } |
| 684 | |
| 685 | func BenchmarkNextMsgNoTimeout(b *testing.B) { |
| 686 | s := RunServerOnPort(TEST_PORT) |
| 687 | defer s.Shutdown() |
| 688 | |
| 689 | ncp, err := nats.Connect(fmt.Sprintf("127.0.0.1:%d", TEST_PORT)) |
| 690 | if err != nil { |
| 691 | b.Fatalf("Error connecting: %v", err) |
| 692 | } |
| 693 | ncs, err := nats.Connect(fmt.Sprintf("127.0.0.1:%d", TEST_PORT), nats.SyncQueueLen(b.N)) |
| 694 | if err != nil { |
| 695 | b.Fatalf("Error connecting: %v", err) |
| 696 | } |
| 697 | |
| 698 | // Test processing speed so no long subject or payloads. |
| 699 | subj := "a" |
| 700 | |
| 701 | sub, err := ncs.SubscribeSync(subj) |
| 702 | if err != nil { |
| 703 | b.Fatalf("Error subscribing: %v", err) |
| 704 | } |
| 705 | ncs.Flush() |
| 706 | |
| 707 | // Set it up so we can internally queue all the messages. |
| 708 | sub.SetPendingLimits(b.N, b.N*1000) |
| 709 | |
| 710 | for i := 0; i < b.N; i++ { |
| 711 | ncp.Publish(subj, nil) |
| 712 | } |
| 713 | ncp.Flush() |
| 714 | |
| 715 | // Wait for them to all be queued up, testing NextMsg not server here. |
| 716 | // Only wait at most one second. |
| 717 | wait := time.Now().Add(time.Second) |
| 718 | for time.Now().Before(wait) { |
| 719 | nm, _, err := sub.Pending() |
| 720 | if err != nil { |
| 721 | b.Fatalf("Error on Pending() - %v", err) |
| 722 | } |
| 723 | if nm >= b.N { |
| 724 | break |
| 725 | } |
| 726 | time.Sleep(10 * time.Millisecond) |
| 727 | } |
| 728 | |
| 729 | b.ResetTimer() |
| 730 | for i := 0; i < b.N; i++ { |
| 731 | if _, err := sub.NextMsg(10 * time.Millisecond); err != nil { |
| 732 | b.Fatalf("Error getting message[%d]: %v", i, err) |
| 733 | } |
| 734 | } |
| 735 | } |
| 736 | |
| 737 | func TestAuthErrorOnReconnect(t *testing.T) { |
| 738 | // This is a bit of an artificial test, but it is to demonstrate |
nothing calls this directly
no test coverage detected