syncGroup completes the consumer group nextGeneration by accepting the memberAssignments (if this Reader is the leader) and returning this Readers subscriptions topic => partitions Possible kafka error codes returned: * GroupCoordinatorNotAvailable: * NotCoordinatorForGroup: * IllegalGeneration: *
(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments)
| 1079 | // * RebalanceInProgress: |
| 1080 | // * GroupAuthorizationFailed: |
| 1081 | func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { |
| 1082 | request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) |
| 1083 | response, err := conn.syncGroup(request) |
| 1084 | if err == nil && response.ErrorCode != 0 { |
| 1085 | err = Error(response.ErrorCode) |
| 1086 | } |
| 1087 | if err != nil { |
| 1088 | return nil, err |
| 1089 | } |
| 1090 | |
| 1091 | assignments := groupAssignment{} |
| 1092 | reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments)) |
| 1093 | if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil { |
| 1094 | return nil, err |
| 1095 | } |
| 1096 | |
| 1097 | if len(assignments.Topics) == 0 { |
| 1098 | cg.withLogger(func(l Logger) { |
| 1099 | l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID) |
| 1100 | }) |
| 1101 | } |
| 1102 | |
| 1103 | cg.withLogger(func(l Logger) { |
| 1104 | l.Printf("sync group finished for group, %v", cg.config.ID) |
| 1105 | }) |
| 1106 | |
| 1107 | return assignments.Topics, nil |
| 1108 | } |
| 1109 | |
| 1110 | func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 { |
| 1111 | request := syncGroupRequestV0{ |
no test coverage detected