assignTopicPartitions uses the selected GroupBalancer to assign members to their various partitions.
(conn coordinator, group joinGroupResponse)
| 1008 | // assignTopicPartitions uses the selected GroupBalancer to assign members to |
| 1009 | // their various partitions. |
| 1010 | func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) { |
| 1011 | cg.withLogger(func(l Logger) { |
| 1012 | l.Printf("selected as leader for group, %s\n", cg.config.ID) |
| 1013 | }) |
| 1014 | |
| 1015 | balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers) |
| 1016 | if !ok { |
| 1017 | // NOTE : this shouldn't happen in practice...the broker should not |
| 1018 | // return successfully from joinGroup unless all members support |
| 1019 | // at least one common protocol. |
| 1020 | return nil, fmt.Errorf("unable to find selected balancer, %v, for group, %v", group.GroupProtocol, cg.config.ID) |
| 1021 | } |
| 1022 | |
| 1023 | members, err := cg.makeMemberProtocolMetadata(group.Members) |
| 1024 | if err != nil { |
| 1025 | return nil, err |
| 1026 | } |
| 1027 | |
| 1028 | topics := extractTopics(members) |
| 1029 | partitions, err := conn.readPartitions(topics...) |
| 1030 | |
| 1031 | // it's not a failure if the topic doesn't exist yet. it results in no |
| 1032 | // assignments for the topic. this matches the behavior of the official |
| 1033 | // clients: java, python, and librdkafka. |
| 1034 | // a topic watcher can trigger a rebalance when the topic comes into being. |
| 1035 | if err != nil && !errors.Is(err, UnknownTopicOrPartition) { |
| 1036 | return nil, err |
| 1037 | } |
| 1038 | |
| 1039 | cg.withLogger(func(l Logger) { |
| 1040 | l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID) |
| 1041 | for _, member := range members { |
| 1042 | l.Printf("found member: %v/%#v", member.ID, member.UserData) |
| 1043 | } |
| 1044 | for _, partition := range partitions { |
| 1045 | l.Printf("found topic/partition: %v/%v", partition.Topic, partition.ID) |
| 1046 | } |
| 1047 | }) |
| 1048 | |
| 1049 | return balancer.AssignGroups(members, partitions), nil |
| 1050 | } |
| 1051 | |
| 1052 | // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember. |
| 1053 | func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) { |