MCPcopy
hub / github.com/segmentio/kafka-go / nextGeneration

Method nextGeneration

consumergroup.go:776–875  ·  view source on GitHub ↗
(memberID string)

Source from the content-addressed store, hash-verified

774}
775
776func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) {
777 // get a new connection to the coordinator on each loop. the previous
778 // generation could have exited due to losing the connection, so this
779 // ensures that we always have a clean starting point. it means we will
780 // re-connect in certain cases, but that shouldn't be an issue given that
781 // rebalances are relatively infrequent under normal operating
782 // conditions.
783 conn, err := cg.coordinator()
784 if err != nil {
785 cg.withErrorLogger(func(log Logger) {
786 log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err)
787 })
788 return memberID, err // a prior memberID may still be valid, so don't return ""
789 }
790 defer conn.Close()
791
792 var generationID int32
793 var groupAssignments GroupMemberAssignments
794 var assignments map[string][]int32
795
796 // join group. this will join the group and prepare assignments if our
797 // consumer is elected leader. it may also change or assign the member ID.
798 memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID)
799 if err != nil {
800 cg.withErrorLogger(func(log Logger) {
801 log.Printf("Failed to join group %s: %v", cg.config.ID, err)
802 })
803 return memberID, err
804 }
805 cg.withLogger(func(log Logger) {
806 log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID)
807 })
808
809 // sync group
810 assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments)
811 if err != nil {
812 cg.withErrorLogger(func(log Logger) {
813 log.Printf("Failed to sync group %s: %v", cg.config.ID, err)
814 })
815 return memberID, err
816 }
817
818 // fetch initial offsets.
819 var offsets map[string]map[int]int64
820 offsets, err = cg.fetchOffsets(conn, assignments)
821 if err != nil {
822 cg.withErrorLogger(func(log Logger) {
823 log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err)
824 })
825 return memberID, err
826 }
827
828 // create the generation.
829 gen := Generation{
830 ID: generationID,
831 GroupID: cg.config.ID,
832 MemberID: memberID,
833 Assignments: cg.makeAssignments(assignments, offsets),

Callers 1

runMethod · 0.95

Calls 12

coordinatorMethod · 0.95
withErrorLoggerMethod · 0.95
joinGroupMethod · 0.95
withLoggerMethod · 0.95
syncGroupMethod · 0.95
fetchOffsetsMethod · 0.95
makeAssignmentsMethod · 0.95
heartbeatLoopMethod · 0.95
partitionWatcherMethod · 0.95
closeMethod · 0.95
PrintfMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected