(group string, groupInstanceIds []string)
| 1721 | } |
| 1722 | |
| 1723 | func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) { |
| 1724 | if !ca.conf.Version.IsAtLeast(V2_4_0_0) { |
| 1725 | return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0") |
| 1726 | } |
| 1727 | var response *LeaveGroupResponse |
| 1728 | request := &LeaveGroupRequest{ |
| 1729 | Version: 3, |
| 1730 | GroupId: group, |
| 1731 | } |
| 1732 | for _, instanceId := range groupInstanceIds { |
| 1733 | groupInstanceId := instanceId |
| 1734 | request.Members = append(request.Members, MemberIdentity{ |
| 1735 | GroupInstanceId: &groupInstanceId, |
| 1736 | }) |
| 1737 | } |
| 1738 | err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { |
| 1739 | defer func() { |
| 1740 | if err != nil && isRetriableGroupCoordinatorError(err) { |
| 1741 | _ = ca.client.RefreshCoordinator(group) |
| 1742 | } |
| 1743 | }() |
| 1744 | |
| 1745 | coordinator, err := ca.client.Coordinator(group) |
| 1746 | if err != nil { |
| 1747 | return err |
| 1748 | } |
| 1749 | |
| 1750 | response, err = coordinator.LeaveGroup(request) |
| 1751 | if err != nil { |
| 1752 | return err |
| 1753 | } |
| 1754 | if !errors.Is(response.Err, ErrNoError) { |
| 1755 | return response.Err |
| 1756 | } |
| 1757 | |
| 1758 | return nil |
| 1759 | }) |
| 1760 | |
| 1761 | return response, err |
| 1762 | } |
nothing calls this directly
no test coverage detected