| 111 | } |
| 112 | |
| 113 | func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments { |
| 114 | groupAssignments := GroupMemberAssignments{} |
| 115 | membersByTopic := findMembersByTopic(members) |
| 116 | for topic, members := range membersByTopic { |
| 117 | partitionIDs := findPartitions(topic, topicPartitions) |
| 118 | memberCount := len(members) |
| 119 | |
| 120 | for memberIndex, member := range members { |
| 121 | assignmentsByTopic, ok := groupAssignments[member.ID] |
| 122 | if !ok { |
| 123 | assignmentsByTopic = map[string][]int{} |
| 124 | groupAssignments[member.ID] = assignmentsByTopic |
| 125 | } |
| 126 | |
| 127 | for partitionIndex, partition := range partitionIDs { |
| 128 | if (partitionIndex % memberCount) == memberIndex { |
| 129 | assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition) |
| 130 | } |
| 131 | } |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | return groupAssignments |
| 136 | } |
| 137 | |
| 138 | // RackAffinityGroupBalancer makes a best effort to pair up consumers with |
| 139 | // partitions whose leader is in the same rack. This strategy can have |