joinGroup attempts to join the reader to the consumer group. Returns GroupMemberAssignments is this Reader was selected as the leader. Otherwise, GroupMemberAssignments will be nil. Possible kafka error codes returned: * GroupLoadInProgress: * GroupCoordinatorNotAvailable: * NotCoordinatorForGroup
(conn coordinator, memberID string)
| 932 | // * InvalidSessionTimeout: |
| 933 | // * GroupAuthorizationFailed: |
| 934 | func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { |
| 935 | request, err := cg.makeJoinGroupRequest(memberID) |
| 936 | if err != nil { |
| 937 | return "", 0, nil, err |
| 938 | } |
| 939 | |
| 940 | response, err := conn.joinGroup(request) |
| 941 | if err == nil && response.ErrorCode != 0 { |
| 942 | err = Error(response.ErrorCode) |
| 943 | } |
| 944 | if err != nil { |
| 945 | return "", 0, nil, err |
| 946 | } |
| 947 | |
| 948 | memberID = response.MemberID |
| 949 | generationID := response.GenerationID |
| 950 | |
| 951 | cg.withLogger(func(l Logger) { |
| 952 | l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) |
| 953 | }) |
| 954 | |
| 955 | var assignments GroupMemberAssignments |
| 956 | if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { |
| 957 | v, err := cg.assignTopicPartitions(conn, response) |
| 958 | if err != nil { |
| 959 | return memberID, 0, nil, err |
| 960 | } |
| 961 | assignments = v |
| 962 | |
| 963 | cg.withLogger(func(l Logger) { |
| 964 | for memberID, assignment := range assignments { |
| 965 | for topic, partitions := range assignment { |
| 966 | l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions) |
| 967 | } |
| 968 | } |
| 969 | }) |
| 970 | } |
| 971 | |
| 972 | cg.withLogger(func(l Logger) { |
| 973 | l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID) |
| 974 | }) |
| 975 | |
| 976 | return memberID, generationID, assignments, nil |
| 977 | } |
| 978 | |
| 979 | // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup |
| 980 | // request. |
no test coverage detected