Leaves the cluster, called by Close.
()
| 685 | |
| 686 | // Leaves the cluster, called by Close. |
| 687 | func (c *consumerGroup) leave() error { |
| 688 | c.lock.Lock() |
| 689 | defer c.lock.Unlock() |
| 690 | if c.memberID == "" { |
| 691 | return nil |
| 692 | } |
| 693 | |
| 694 | coordinator, err := c.client.Coordinator(c.groupID) |
| 695 | if err != nil { |
| 696 | return err |
| 697 | } |
| 698 | |
| 699 | // as per KIP-345 if groupInstanceId is set, i.e. static membership is in action, then do not leave group when consumer closed, just clear memberID |
| 700 | if c.groupInstanceId != nil { |
| 701 | c.memberID = "" |
| 702 | return nil |
| 703 | } |
| 704 | req := &LeaveGroupRequest{ |
| 705 | GroupId: c.groupID, |
| 706 | MemberId: c.memberID, |
| 707 | } |
| 708 | if c.config.Version.IsAtLeast(V3_2_0_0) { |
| 709 | req.Version = 5 |
| 710 | } else if c.config.Version.IsAtLeast(V2_4_0_0) { |
| 711 | req.Version = 4 |
| 712 | } else if c.config.Version.IsAtLeast(V2_0_0_0) { |
| 713 | req.Version = 2 |
| 714 | } else if c.config.Version.IsAtLeast(V0_11_0_0) { |
| 715 | req.Version = 1 |
| 716 | } |
| 717 | if req.Version >= 3 { |
| 718 | member := MemberIdentity{ |
| 719 | MemberId: c.memberID, |
| 720 | } |
| 721 | if req.Version >= 5 { |
| 722 | reason := "the consumer is being closed" |
| 723 | member.Reason = &reason |
| 724 | } |
| 725 | req.Members = append(req.Members, member) |
| 726 | } |
| 727 | |
| 728 | resp, err := coordinator.LeaveGroup(req) |
| 729 | if err != nil { |
| 730 | _ = coordinator.Close() |
| 731 | return err |
| 732 | } |
| 733 | |
| 734 | // clear the memberID |
| 735 | c.memberID = "" |
| 736 | |
| 737 | switch resp.Err { |
| 738 | case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError: |
| 739 | return nil |
| 740 | default: |
| 741 | return resp.Err |
| 742 | } |
| 743 | } |
| 744 |
no test coverage detected