nolint:paralleltest
(t *testing.T)
| 884 | |
| 885 | //nolint:paralleltest |
| 886 | func TestClientController(t *testing.T) { |
| 887 | seedBroker := NewMockBroker(t, 1) |
| 888 | defer seedBroker.Close() |
| 889 | controllerBroker := NewMockBroker(t, 2) |
| 890 | defer controllerBroker.Close() |
| 891 | |
| 892 | seedBroker.SetHandlerByMap(map[string]MockResponse{ |
| 893 | "MetadataRequest": NewMockMetadataResponse(t). |
| 894 | SetController(controllerBroker.BrokerID()). |
| 895 | SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). |
| 896 | SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()), |
| 897 | }) |
| 898 | |
| 899 | cfg := NewTestConfig() |
| 900 | |
| 901 | // test kafka version greater than 0.10.0.0 |
| 902 | t.Run("V0_10_0_0", func(t *testing.T) { |
| 903 | cfg.Version = V0_10_0_0 |
| 904 | client1, err := NewClient([]string{seedBroker.Addr()}, cfg) |
| 905 | if err != nil { |
| 906 | t.Fatal(err) |
| 907 | } |
| 908 | defer safeClose(t, client1) |
| 909 | broker, err := client1.Controller() |
| 910 | if err != nil { |
| 911 | t.Fatal(err) |
| 912 | } |
| 913 | if broker.Addr() != controllerBroker.Addr() { |
| 914 | t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr()) |
| 915 | } |
| 916 | }) |
| 917 | |
| 918 | // test kafka version earlier than 0.10.0.0 |
| 919 | t.Run("V0_9_0_1", func(t *testing.T) { |
| 920 | cfg.Version = V0_9_0_1 |
| 921 | client2, err := NewClient([]string{seedBroker.Addr()}, cfg) |
| 922 | if err != nil { |
| 923 | t.Fatal(err) |
| 924 | } |
| 925 | defer safeClose(t, client2) |
| 926 | if _, err = client2.Controller(); !errors.Is(err, ErrUnsupportedVersion) { |
| 927 | t.Errorf("Expected Controller() to return %s, found %s", ErrUnsupportedVersion, err) |
| 928 | } |
| 929 | }) |
| 930 | } |
| 931 | |
| 932 | func TestClientMetadataTimeout(t *testing.T) { |
| 933 | tests := []struct { |
nothing calls this directly
no test coverage detected