MCPcopy
hub / github.com/IBM/sarama / TestFuncAdminUpdateFeatures

Function TestFuncAdminUpdateFeatures

functional_admin_test.go:1018–1100  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1016}
1017
1018func TestFuncAdminUpdateFeatures(t *testing.T) {
1019 t.Parallel()
1020 // feature updates need a KRaft cluster; ZooKeeper-mode brokers don't
1021 // advertise any features
1022 checkKafkaVersion(t, "4.0.0.0")
1023 setupFunctionalTest(t)
1024 defer teardownFunctionalTest(t)
1025
1026 kafkaVersion, err := ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)
1027 require.NoError(t, err)
1028
1029 config := NewFunctionalTestConfig()
1030 config.Version = kafkaVersion
1031 adminClient, err := NewClusterAdmin(FunctionalTestEnv.KafkaBrokerAddrs, config)
1032 require.NoError(t, err)
1033 defer safeClose(t, adminClient)
1034
1035 // an update for a made-up feature name should be rejected without
1036 // touching cluster state; the controller fails the whole batch with
1037 // INVALID_UPDATE_VERSION when every update in it failed
1038 _, err = adminClient.UpdateFeatures([]FeatureUpdate{{
1039 Feature: "sarama.unsupported.feature",
1040 MaxVersionLevel: 1,
1041 }})
1042 require.ErrorIs(t, err, ErrInvalidUpdateVersion)
1043
1044 controller, err := adminClient.Controller()
1045 require.NoError(t, err)
1046
1047 describeFeatures := func(t require.TestingT) (supported map[string]int16, finalized map[string]int16) {
1048 // v4 needed: v0-v3 responses omit supported features with min version 0
1049 rsp, err := controller.ApiVersions(&ApiVersionsRequest{
1050 Version: 4,
1051 ClientSoftwareName: defaultClientSoftwareName,
1052 ClientSoftwareVersion: version(),
1053 })
1054 require.NoError(t, err)
1055 supported = make(map[string]int16, len(rsp.SupportedFeatures))
1056 for _, f := range rsp.SupportedFeatures {
1057 supported[f.Name] = f.MaxVersion
1058 }
1059 finalized = make(map[string]int16, len(rsp.FinalizedFeatures))
1060 for _, f := range rsp.FinalizedFeatures {
1061 finalized[f.Name] = f.MaxVersionLevel
1062 }
1063 return supported, finalized
1064 }
1065
1066 supported, finalized := describeFeatures(t)
1067 require.NotEmpty(t, supported)
1068
1069 // upgrade every feature whose finalized level sits below the supported
1070 // max, one batch each so a single rejection doesn't sink the rest
1071 upgraded := make(map[string]int16)
1072 for name, maxVersion := range supported {
1073 // kraft.version upgrades are refused on a static-quorum cluster
1074 if name == "kraft.version" || finalized[name] >= maxVersion {
1075 continue

Callers

nothing calls this directly

Calls 12

UpdateFeaturesMethod · 0.95
ControllerMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
ParseKafkaVersionFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
NewClusterAdminFunction · 0.85
versionFunction · 0.85
ApiVersionsMethod · 0.80
safeCloseFunction · 0.70
LenMethod · 0.45

Tested by

no test coverage detected