MCPcopy
hub / github.com/segmentio/kafka-go / assignTopicPartitions

Method assignTopicPartitions

consumergroup.go:1010–1050  ·  view source on GitHub ↗

assignTopicPartitions uses the selected GroupBalancer to assign members to their various partitions.

(conn coordinator, group joinGroupResponse)

Source from the content-addressed store, hash-verified

1008// assignTopicPartitions uses the selected GroupBalancer to assign members to
1009// their various partitions.
1010func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) {
1011 cg.withLogger(func(l Logger) {
1012 l.Printf("selected as leader for group, %s\n", cg.config.ID)
1013 })
1014
1015 balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers)
1016 if !ok {
1017 // NOTE : this shouldn't happen in practice...the broker should not
1018 // return successfully from joinGroup unless all members support
1019 // at least one common protocol.
1020 return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID)
1021 }
1022
1023 members, err := cg.makeMemberProtocolMetadata(group.Members)
1024 if err != nil {
1025 return nil, err
1026 }
1027
1028 topics := extractTopics(members)
1029 partitions, err := conn.readPartitions(topics...)
1030
1031 // it's not a failure if the topic doesn't exist yet. it results in no
1032 // assignments for the topic. this matches the behavior of the official
1033 // clients: java, python, and librdkafka.
1034 // a topic watcher can trigger a rebalance when the topic comes into being.
1035 if err != nil && !errors.Is(err, UnknownTopicOrPartition) {
1036 return nil, err
1037 }
1038
1039 cg.withLogger(func(l Logger) {
1040 l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID)
1041 for _, member := range members {
1042 l.Printf("found member: %v/%#v", member.ID, member.UserData)
1043 }
1044 for _, partition := range partitions {
1045 l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID)
1046 }
1047 })
1048
1049 return balancer.AssignGroups(members, partitions), nil
1050}
1051
1052// makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember.
1053func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) {

Callers 2

joinGroupMethod · 0.95

Calls 7

withLoggerMethod · 0.95
findGroupBalancerFunction · 0.85
extractTopicsFunction · 0.85
PrintfMethod · 0.65
readPartitionsMethod · 0.65
AssignGroupsMethod · 0.65

Tested by 1