MCPcopy
hub / github.com/segmentio/kafka-go / AssignGroups

Method AssignGroups

groupbalancer.go:113–136  ·  view source on GitHub ↗
(members []GroupMember, topicPartitions []Partition)

Source from the content-addressed store, hash-verified

111}
112
113func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
114 groupAssignments := GroupMemberAssignments{}
115 membersByTopic := findMembersByTopic(members)
116 for topic, members := range membersByTopic {
117 partitionIDs := findPartitions(topic, topicPartitions)
118 memberCount := len(members)
119
120 for memberIndex, member := range members {
121 assignmentsByTopic, ok := groupAssignments[member.ID]
122 if !ok {
123 assignmentsByTopic = map[string][]int{}
124 groupAssignments[member.ID] = assignmentsByTopic
125 }
126
127 for partitionIndex, partition := range partitionIDs {
128 if (partitionIndex % memberCount) == memberIndex {
129 assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
130 }
131 }
132 }
133 }
134
135 return groupAssignments
136}
137
138// RackAffinityGroupBalancer makes a best effort to pair up consumers with
139// partitions whose leader is in the same rack. This strategy can have

Callers 2

TestClientLeaveGroupFunction · 0.95
TestClientSyncGroupFunction · 0.95

Calls 2

findMembersByTopicFunction · 0.85
findPartitionsFunction · 0.85

Tested by 2

TestClientLeaveGroupFunction · 0.76
TestClientSyncGroupFunction · 0.76