(memberID string)
| 774 | } |
| 775 | |
| 776 | func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { |
| 777 | // get a new connection to the coordinator on each loop. the previous |
| 778 | // generation could have exited due to losing the connection, so this |
| 779 | // ensures that we always have a clean starting point. it means we will |
| 780 | // re-connect in certain cases, but that shouldn't be an issue given that |
| 781 | // rebalances are relatively infrequent under normal operating |
| 782 | // conditions. |
| 783 | conn, err := cg.coordinator() |
| 784 | if err != nil { |
| 785 | cg.withErrorLogger(func(log Logger) { |
| 786 | log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err) |
| 787 | }) |
| 788 | return memberID, err // a prior memberID may still be valid, so don't return "" |
| 789 | } |
| 790 | defer conn.Close() |
| 791 | |
| 792 | var generationID int32 |
| 793 | var groupAssignments GroupMemberAssignments |
| 794 | var assignments map[string][]int32 |
| 795 | |
| 796 | // join group. this will join the group and prepare assignments if our |
| 797 | // consumer is elected leader. it may also change or assign the member ID. |
| 798 | memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID) |
| 799 | if err != nil { |
| 800 | cg.withErrorLogger(func(log Logger) { |
| 801 | log.Printf("Failed to join group %s: %v", cg.config.ID, err) |
| 802 | }) |
| 803 | return memberID, err |
| 804 | } |
| 805 | cg.withLogger(func(log Logger) { |
| 806 | log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) |
| 807 | }) |
| 808 | |
| 809 | // sync group |
| 810 | assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments) |
| 811 | if err != nil { |
| 812 | cg.withErrorLogger(func(log Logger) { |
| 813 | log.Printf("Failed to sync group %s: %v", cg.config.ID, err) |
| 814 | }) |
| 815 | return memberID, err |
| 816 | } |
| 817 | |
| 818 | // fetch initial offsets. |
| 819 | var offsets map[string]map[int]int64 |
| 820 | offsets, err = cg.fetchOffsets(conn, assignments) |
| 821 | if err != nil { |
| 822 | cg.withErrorLogger(func(log Logger) { |
| 823 | log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err) |
| 824 | }) |
| 825 | return memberID, err |
| 826 | } |
| 827 | |
| 828 | // create the generation. |
| 829 | gen := Generation{ |
| 830 | ID: generationID, |
| 831 | GroupID: cg.config.ID, |
| 832 | MemberID: memberID, |
| 833 | Assignments: cg.makeAssignments(assignments, offsets), |
no test coverage detected