(t *testing.T)
| 1016 | } |
| 1017 | |
| 1018 | func TestFuncAdminUpdateFeatures(t *testing.T) { |
| 1019 | t.Parallel() |
| 1020 | // feature updates need a KRaft cluster; ZooKeeper-mode brokers don't |
| 1021 | // advertise any features |
| 1022 | checkKafkaVersion(t, "4.0.0.0") |
| 1023 | setupFunctionalTest(t) |
| 1024 | defer teardownFunctionalTest(t) |
| 1025 | |
| 1026 | kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion) |
| 1027 | require.NoError(t, err) |
| 1028 | |
| 1029 | config := NewFunctionalTestConfig() |
| 1030 | config.Version = kafkaVersion |
| 1031 | adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 1032 | require.NoError(t, err) |
| 1033 | defer safeClose(t, adminClient) |
| 1034 | |
| 1035 | // an update for a made-up feature name should be rejected without |
| 1036 | // touching cluster state; the controller fails the whole batch with |
| 1037 | // INVALID_UPDATE_VERSION when every update in it failed |
| 1038 | _, err = adminClient.UpdateFeatures([]FeatureUpdate{{ |
| 1039 | Feature: "sarama.unsupported.feature", |
| 1040 | MaxVersionLevel: 1, |
| 1041 | }}) |
| 1042 | require.ErrorIs(t, err, ErrInvalidUpdateVersion) |
| 1043 | |
| 1044 | controller, err := adminClient.Controller() |
| 1045 | require.NoError(t, err) |
| 1046 | |
| 1047 | describeFeatures := func(t require.TestingT) (supported map[string]int16, finalized map[string]int16) { |
| 1048 | // v4 needed: v0-v3 responses omit supported features with min version 0 |
| 1049 | rsp, err := controller.ApiVersions(&ApiVersionsRequest{ |
| 1050 | Version: 4, |
| 1051 | ClientSoftwareName: defaultClientSoftwareName, |
| 1052 | ClientSoftwareVersion: version(), |
| 1053 | }) |
| 1054 | require.NoError(t, err) |
| 1055 | supported = make(map[string]int16, len(rsp.SupportedFeatures)) |
| 1056 | for _, f := range rsp.SupportedFeatures { |
| 1057 | supported[f.Name] = f.MaxVersion |
| 1058 | } |
| 1059 | finalized = make(map[string]int16, len(rsp.FinalizedFeatures)) |
| 1060 | for _, f := range rsp.FinalizedFeatures { |
| 1061 | finalized[f.Name] = f.MaxVersionLevel |
| 1062 | } |
| 1063 | return supported, finalized |
| 1064 | } |
| 1065 | |
| 1066 | supported, finalized := describeFeatures(t) |
| 1067 | require.NotEmpty(t, supported) |
| 1068 | |
| 1069 | // upgrade every feature whose finalized level sits below the supported |
| 1070 | // max, one batch each so a single rejection doesn't sink the rest |
| 1071 | upgraded := make(map[string]int16) |
| 1072 | for name, maxVersion := range supported { |
| 1073 | // kraft.version upgrades are refused on a static-quorum cluster |
| 1074 | if name == "kraft.version" || finalized[name] >= maxVersion { |
| 1075 | continue |
nothing calls this directly
no test coverage detected