MCPcopy
hub / github.com/segmentio/kafka-go / syncGroup

Method syncGroup

consumergroup.go:1081–1108  ·  view source on GitHub ↗

syncGroup completes the consumer group nextGeneration by accepting the memberAssignments (if this Reader is the leader) and returning this Readers subscriptions topic => partitions Possible kafka error codes returned: * GroupCoordinatorNotAvailable: * NotCoordinatorForGroup: * IllegalGeneration: *

(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments)

Source from the content-addressed store, hash-verified

1079// * RebalanceInProgress:
1080// * GroupAuthorizationFailed:
1081func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
1082 request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
1083 response, err := conn.syncGroup(request)
1084 if err == nil && response.ErrorCode != 0 {
1085 err = Error(response.ErrorCode)
1086 }
1087 if err != nil {
1088 return nil, err
1089 }
1090
1091 assignments := groupAssignment{}
1092 reader := bufio.NewReader(bytes.NewReader(response.MemberAssignments))
1093 if _, err := (&assignments).readFrom(reader, len(response.MemberAssignments)); err != nil {
1094 return nil, err
1095 }
1096
1097 if len(assignments.Topics) == 0 {
1098 cg.withLogger(func(l Logger) {
1099 l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID)
1100 })
1101 }
1102
1103 cg.withLogger(func(l Logger) {
1104 l.Printf("sync group finished for group, %v", cg.config.ID)
1105 })
1106
1107 return assignments.Topics, nil
1108}
1109
1110func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID int32, memberAssignments GroupMemberAssignments) syncGroupRequestV0 {
1111 request := syncGroupRequestV0{

Callers 1

nextGenerationMethod · 0.95

Calls 7

withLoggerMethod · 0.95
ErrorTypeAlias · 0.70
syncGroupMethod · 0.65
NewReaderMethod · 0.65
PrintfMethod · 0.65
readFromMethod · 0.45

Tested by

no test coverage detected