(t *testing.T)
| 760 | } |
| 761 | |
| 762 | func TestClientCheckBrokersHealth(t *testing.T) { |
| 763 | newConnectedBroker := func(t *testing.T) (*Broker, *net.TCPConn, func()) { |
| 764 | t.Helper() |
| 765 | |
| 766 | listener, err := net.Listen("tcp", "127.0.0.1:0") |
| 767 | require.NoError(t, err) |
| 768 | |
| 769 | accepted := make(chan *net.TCPConn, 1) |
| 770 | acceptErr := make(chan error, 1) |
| 771 | go func() { |
| 772 | conn, err := listener.Accept() |
| 773 | if err != nil { |
| 774 | acceptErr <- err |
| 775 | return |
| 776 | } |
| 777 | accepted <- conn.(*net.TCPConn) |
| 778 | }() |
| 779 | |
| 780 | conn, err := net.Dial("tcp", listener.Addr().String()) |
| 781 | require.NoError(t, err) |
| 782 | |
| 783 | var serverConn *net.TCPConn |
| 784 | select { |
| 785 | case serverConn = <-accepted: |
| 786 | case err := <-acceptErr: |
| 787 | require.NoError(t, err) |
| 788 | case <-time.After(time.Second): |
| 789 | require.FailNow(t, "timed out waiting for test broker connection") |
| 790 | } |
| 791 | |
| 792 | broker := NewBroker(listener.Addr().String()) |
| 793 | broker.conn = conn.(*net.TCPConn) |
| 794 | broker.metricRegistry = metrics.NewRegistry() |
| 795 | broker.opened.Store(true) |
| 796 | |
| 797 | cleanup := func() { |
| 798 | _ = listener.Close() |
| 799 | _ = serverConn.Close() |
| 800 | _ = broker.Close() |
| 801 | } |
| 802 | |
| 803 | return broker, serverConn, cleanup |
| 804 | } |
| 805 | |
| 806 | t.Run("does not wait for brokers that are opening", func(t *testing.T) { |
| 807 | broker := &Broker{id: 1, addr: "127.0.0.1:9092"} |
| 808 | broker.lock.Lock() |
| 809 | defer broker.lock.Unlock() |
| 810 | |
| 811 | client := &client{ |
| 812 | brokers: map[int32]*Broker{broker.ID(): broker}, |
| 813 | } |
| 814 | |
| 815 | done := make(chan struct{}) |
| 816 | go func() { |
| 817 | client.checkBrokersHealth() |
| 818 | close(done) |
| 819 | }() |
nothing calls this directly
no test coverage detected