MCPcopy
hub / github.com/IBM/sarama / RemoveMemberFromConsumerGroup

Method RemoveMemberFromConsumerGroup

admin.go:1723–1762  ·  view source on GitHub ↗
(group string, groupInstanceIds []string)

Source from the content-addressed store, hash-verified

1721}
1722
1723func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) {
1724 if !ca.conf.Version.IsAtLeast(V2_4_0_0) {
1725 return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0")
1726 }
1727 var response *LeaveGroupResponse
1728 request := &LeaveGroupRequest{
1729 Version: 3,
1730 GroupId: group,
1731 }
1732 for _, instanceId := range groupInstanceIds {
1733 groupInstanceId := instanceId
1734 request.Members = append(request.Members, MemberIdentity{
1735 GroupInstanceId: &groupInstanceId,
1736 })
1737 }
1738 err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) {
1739 defer func() {
1740 if err != nil && isRetriableGroupCoordinatorError(err) {
1741 _ = ca.client.RefreshCoordinator(group)
1742 }
1743 }()
1744
1745 coordinator, err := ca.client.Coordinator(group)
1746 if err != nil {
1747 return err
1748 }
1749
1750 response, err = coordinator.LeaveGroup(request)
1751 if err != nil {
1752 return err
1753 }
1754 if !errors.Is(response.Err, ErrNoError) {
1755 return response.Err
1756 }
1757
1758 return nil
1759 })
1760
1761 return response, err
1762}

Callers

nothing calls this directly

Calls 8

retryOnErrorMethod · 0.95
ConfigurationErrorTypeAlias · 0.85
IsAtLeastMethod · 0.80
LeaveGroupMethod · 0.80
IsMethod · 0.80
RefreshCoordinatorMethod · 0.65
CoordinatorMethod · 0.65

Tested by

no test coverage detected