MCPcopy
hub / github.com/IBM/sarama / leave

Method leave

consumer_group.go:687–743  ·  view source on GitHub ↗

Leaves the cluster, called by Close.

()

Source from the content-addressed store, hash-verified

685
686// Leaves the cluster, called by Close.
687func (c *consumerGroup) leave() error {
688 c.lock.Lock()
689 defer c.lock.Unlock()
690 if c.memberID == "" {
691 return nil
692 }
693
694 coordinator, err := c.client.Coordinator(c.groupID)
695 if err != nil {
696 return err
697 }
698
699 // as per KIP-345 if groupInstanceId is set, i.e. static membership is in action, then do not leave group when consumer closed, just clear memberID
700 if c.groupInstanceId != nil {
701 c.memberID = ""
702 return nil
703 }
704 req := &LeaveGroupRequest{
705 GroupId: c.groupID,
706 MemberId: c.memberID,
707 }
708 if c.config.Version.IsAtLeast(V3_2_0_0) {
709 req.Version = 5
710 } else if c.config.Version.IsAtLeast(V2_4_0_0) {
711 req.Version = 4
712 } else if c.config.Version.IsAtLeast(V2_0_0_0) {
713 req.Version = 2
714 } else if c.config.Version.IsAtLeast(V0_11_0_0) {
715 req.Version = 1
716 }
717 if req.Version >= 3 {
718 member := MemberIdentity{
719 MemberId: c.memberID,
720 }
721 if req.Version >= 5 {
722 reason := "the consumer is being closed"
723 member.Reason = &reason
724 }
725 req.Members = append(req.Members, member)
726 }
727
728 resp, err := coordinator.LeaveGroup(req)
729 if err != nil {
730 _ = coordinator.Close()
731 return err
732 }
733
734 // clear the memberID
735 c.memberID = ""
736
737 switch resp.Err {
738 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
739 return nil
740 default:
741 return resp.Err
742 }
743}
744

Callers 1

CloseMethod · 0.95

Calls 4

IsAtLeastMethod · 0.80
LeaveGroupMethod · 0.80
CoordinatorMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected