MCPcopy
hub / github.com/IBM/sarama / DescribeConsumerGroups

Method DescribeConsumerGroups

admin.go:1231–1269  ·  view source on GitHub ↗
(groups []string)

Source from the content-addressed store, hash-verified

1229}
1230
1231func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
1232 groupsPerBroker := make(map[*Broker][]string)
1233
1234 for _, group := range groups {
1235 coordinator, err := ca.client.Coordinator(group)
1236 if err != nil {
1237 return nil, err
1238 }
1239 groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group)
1240 }
1241
1242 for broker, brokerGroups := range groupsPerBroker {
1243 describeReq := &DescribeGroupsRequest{
1244 Groups: brokerGroups,
1245 }
1246
1247 if ca.conf.Version.IsAtLeast(V2_4_0_0) {
1248 // Starting in version 4, the response will include group.instance.id info for members.
1249 // Starting in version 5, the response uses flexible encoding
1250 describeReq.Version = 5
1251 } else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
1252 // Starting in version 3, authorized operations can be requested.
1253 describeReq.Version = 3
1254 } else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
1255 // Version 2 is the same as version 0.
1256 describeReq.Version = 2
1257 } else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
1258 // Version 1 is the same as version 0.
1259 describeReq.Version = 1
1260 }
1261 response, err := broker.DescribeGroups(describeReq)
1262 if err != nil {
1263 return nil, err
1264 }
1265
1266 result = append(result, response.Groups...)
1267 }
1268 return result, nil
1269}
1270
1271func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
1272 allGroups = make(map[string]string)

Callers

nothing calls this directly

Calls 3

IsAtLeastMethod · 0.80
DescribeGroupsMethod · 0.80
CoordinatorMethod · 0.65

Tested by

no test coverage detected