(topic string, assignment [][]int32)
| 679 | } |
| 680 | |
| 681 | func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error { |
| 682 | if topic == "" { |
| 683 | return ErrInvalidTopic |
| 684 | } |
| 685 | |
| 686 | request := &AlterPartitionReassignmentsRequest{ |
| 687 | TimeoutMs: int32(60000), |
| 688 | Version: int16(0), |
| 689 | } |
| 690 | |
| 691 | for i := range assignment { |
| 692 | request.AddBlock(topic, int32(i), assignment[i]) |
| 693 | } |
| 694 | |
| 695 | return ca.retryOnError(isRetriableControllerError, func() error { |
| 696 | b, err := ca.Controller() |
| 697 | if err != nil { |
| 698 | return err |
| 699 | } |
| 700 | |
| 701 | errs := make([]error, 0) |
| 702 | |
| 703 | rsp, err := b.AlterPartitionReassignments(request) |
| 704 | |
| 705 | if err != nil { |
| 706 | errs = append(errs, err) |
| 707 | } else { |
| 708 | if rsp.ErrorCode > 0 { |
| 709 | errs = append(errs, rsp.ErrorCode) |
| 710 | } |
| 711 | |
| 712 | for topic, topicErrors := range rsp.Errors { |
| 713 | for partition, partitionError := range topicErrors { |
| 714 | if !errors.Is(partitionError.errorCode, ErrNoError) { |
| 715 | errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode)) |
| 716 | } |
| 717 | } |
| 718 | } |
| 719 | } |
| 720 | |
| 721 | if len(errs) > 0 { |
| 722 | return Wrap(ErrReassignPartitions, errs...) |
| 723 | } |
| 724 | |
| 725 | return nil |
| 726 | }) |
| 727 | } |
| 728 | |
| 729 | func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) { |
| 730 | if topic == "" { |
nothing calls this directly
no test coverage detected