MCPcopy
hub / github.com/segmentio/kafka-go / joinGroup

Method joinGroup

consumergroup.go:934–977  ·  view source on GitHub ↗

joinGroup attempts to join the reader to the consumer group. Returns GroupMemberAssignments is this Reader was selected as the leader. Otherwise, GroupMemberAssignments will be nil. Possible kafka error codes returned: * GroupLoadInProgress: * GroupCoordinatorNotAvailable: * NotCoordinatorForGroup

(conn coordinator, memberID string)

Source from the content-addressed store, hash-verified

932// * InvalidSessionTimeout:
933// * GroupAuthorizationFailed:
934func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
935 request, err := cg.makeJoinGroupRequest(memberID)
936 if err != nil {
937 return "", 0, nil, err
938 }
939
940 response, err := conn.joinGroup(request)
941 if err == nil && response.ErrorCode != 0 {
942 err = Error(response.ErrorCode)
943 }
944 if err != nil {
945 return "", 0, nil, err
946 }
947
948 memberID = response.MemberID
949 generationID := response.GenerationID
950
951 cg.withLogger(func(l Logger) {
952 l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
953 })
954
955 var assignments GroupMemberAssignments
956 if iAmLeader := response.MemberID == response.LeaderID; iAmLeader {
957 v, err := cg.assignTopicPartitions(conn, response)
958 if err != nil {
959 return memberID, 0, nil, err
960 }
961 assignments = v
962
963 cg.withLogger(func(l Logger) {
964 for memberID, assignment := range assignments {
965 for topic, partitions := range assignment {
966 l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions)
967 }
968 }
969 })
970 }
971
972 cg.withLogger(func(l Logger) {
973 l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID)
974 })
975
976 return memberID, generationID, assignments, nil
977}
978
979// makeJoinGroupRequestV1 handles the logic of constructing a joinGroup
980// request.

Callers 1

nextGenerationMethod · 0.95

Calls 6

makeJoinGroupRequestMethod · 0.95
withLoggerMethod · 0.95
assignTopicPartitionsMethod · 0.95
ErrorTypeAlias · 0.70
joinGroupMethod · 0.65
PrintfMethod · 0.65

Tested by

no test coverage detected