(groups []string)
| 1229 | } |
| 1230 | |
| 1231 | func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) { |
| 1232 | groupsPerBroker := make(map[*Broker][]string) |
| 1233 | |
| 1234 | for _, group := range groups { |
| 1235 | coordinator, err := ca.client.Coordinator(group) |
| 1236 | if err != nil { |
| 1237 | return nil, err |
| 1238 | } |
| 1239 | groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group) |
| 1240 | } |
| 1241 | |
| 1242 | for broker, brokerGroups := range groupsPerBroker { |
| 1243 | describeReq := &DescribeGroupsRequest{ |
| 1244 | Groups: brokerGroups, |
| 1245 | } |
| 1246 | |
| 1247 | if ca.conf.Version.IsAtLeast(V2_4_0_0) { |
| 1248 | // Starting in version 4, the response will include group.instance.id info for members. |
| 1249 | // Starting in version 5, the response uses flexible encoding |
| 1250 | describeReq.Version = 5 |
| 1251 | } else if ca.conf.Version.IsAtLeast(V2_3_0_0) { |
| 1252 | // Starting in version 3, authorized operations can be requested. |
| 1253 | describeReq.Version = 3 |
| 1254 | } else if ca.conf.Version.IsAtLeast(V2_0_0_0) { |
| 1255 | // Version 2 is the same as version 0. |
| 1256 | describeReq.Version = 2 |
| 1257 | } else if ca.conf.Version.IsAtLeast(V1_1_0_0) { |
| 1258 | // Version 1 is the same as version 0. |
| 1259 | describeReq.Version = 1 |
| 1260 | } |
| 1261 | response, err := broker.DescribeGroups(describeReq) |
| 1262 | if err != nil { |
| 1263 | return nil, err |
| 1264 | } |
| 1265 | |
| 1266 | result = append(result, response.Groups...) |
| 1267 | } |
| 1268 | return result, nil |
| 1269 | } |
| 1270 | |
| 1271 | func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) { |
| 1272 | allGroups = make(map[string]string) |
nothing calls this directly
no test coverage detected