()
| 712 | } |
| 713 | |
| 714 | func (cg *ConsumerGroup) run() { |
| 715 | // the memberID is the only piece of information that is maintained across |
| 716 | // generations. it starts empty and will be assigned on the first nextGeneration |
| 717 | // when the joinGroup request is processed. it may change again later if |
| 718 | // the CG coordinator fails over or if the member is evicted. otherwise, it |
| 719 | // will be constant for the lifetime of this group. |
| 720 | var memberID string |
| 721 | var err error |
| 722 | for { |
| 723 | memberID, err = cg.nextGeneration(memberID) |
| 724 | |
| 725 | // backoff will be set if this go routine should sleep before continuing |
| 726 | // to the next generation. it will be non-nil in the case of an error |
| 727 | // joining or syncing the group. |
| 728 | var backoff <-chan time.Time |
| 729 | |
| 730 | switch { |
| 731 | case err == nil: |
| 732 | // no error...the previous generation finished normally. |
| 733 | continue |
| 734 | |
| 735 | case errors.Is(err, ErrGroupClosed): |
| 736 | // the CG has been closed...leave the group and exit loop. |
| 737 | _ = cg.leaveGroup(memberID) |
| 738 | return |
| 739 | |
| 740 | case errors.Is(err, RebalanceInProgress): |
| 741 | // in case of a RebalanceInProgress, don't leave the group or |
| 742 | // change the member ID, but report the error. the next attempt |
| 743 | // to join the group will then be subject to the rebalance |
| 744 | // timeout, so the broker will be responsible for throttling |
| 745 | // this loop. |
| 746 | |
| 747 | default: |
| 748 | // leave the group and report the error if we had gotten far |
| 749 | // enough so as to have a member ID. also clear the member id |
| 750 | // so we don't attempt to use it again. in order to avoid |
| 751 | // a tight error loop, backoff before the next attempt to join |
| 752 | // the group. |
| 753 | _ = cg.leaveGroup(memberID) |
| 754 | memberID = "" |
| 755 | backoff = time.After(cg.config.JoinGroupBackoff) |
| 756 | } |
| 757 | // ensure that we exit cleanly in case the CG is done and no one is |
| 758 | // waiting to receive on the unbuffered error channel. |
| 759 | select { |
| 760 | case <-cg.done: |
| 761 | return |
| 762 | case cg.errs <- err: |
| 763 | } |
| 764 | // backoff if needed, being sure to exit cleanly if the CG is done. |
| 765 | if backoff != nil { |
| 766 | select { |
| 767 | case <-cg.done: |
| 768 | // exit cleanly if the group is closed. |
| 769 | return |
| 770 | case <-backoff: |
| 771 | } |
no test coverage detected