MCPcopy
hub / github.com/segmentio/kafka-go / run

Method run

consumergroup.go:714–774  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

712}
713
714func (cg *ConsumerGroup) run() {
715 // the memberID is the only piece of information that is maintained across
716 // generations. it starts empty and will be assigned on the first nextGeneration
717 // when the joinGroup request is processed. it may change again later if
718 // the CG coordinator fails over or if the member is evicted. otherwise, it
719 // will be constant for the lifetime of this group.
720 var memberID string
721 var err error
722 for {
723 memberID, err = cg.nextGeneration(memberID)
724
725 // backoff will be set if this go routine should sleep before continuing
726 // to the next generation. it will be non-nil in the case of an error
727 // joining or syncing the group.
728 var backoff <-chan time.Time
729
730 switch {
731 case err == nil:
732 // no error...the previous generation finished normally.
733 continue
734
735 case errors.Is(err, ErrGroupClosed):
736 // the CG has been closed...leave the group and exit loop.
737 _ = cg.leaveGroup(memberID)
738 return
739
740 case errors.Is(err, RebalanceInProgress):
741 // in case of a RebalanceInProgress, don't leave the group or
742 // change the member ID, but report the error. the next attempt
743 // to join the group will then be subject to the rebalance
744 // timeout, so the broker will be responsible for throttling
745 // this loop.
746
747 default:
748 // leave the group and report the error if we had gotten far
749 // enough so as to have a member ID. also clear the member id
750 // so we don't attempt to use it again. in order to avoid
751 // a tight error loop, backoff before the next attempt to join
752 // the group.
753 _ = cg.leaveGroup(memberID)
754 memberID = ""
755 backoff = time.After(cg.config.JoinGroupBackoff)
756 }
757 // ensure that we exit cleanly in case the CG is done and no one is
758 // waiting to receive on the unbuffered error channel.
759 select {
760 case <-cg.done:
761 return
762 case cg.errs <- err:
763 }
764 // backoff if needed, being sure to exit cleanly if the CG is done.
765 if backoff != nil {
766 select {
767 case <-cg.done:
768 // exit cleanly if the group is closed.
769 return
770 case <-backoff:
771 }

Callers 1

NewConsumerGroupFunction · 0.95

Calls 2

nextGenerationMethod · 0.95
leaveGroupMethod · 0.95

Tested by

no test coverage detected