MCPcopy
hub / github.com/IBM/sarama / AlterPartitionReassignments

Method AlterPartitionReassignments

admin.go:681–727  ·  view source on GitHub ↗
(topic string, assignment [][]int32)

Source from the content-addressed store, hash-verified

679}
680
681func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
682 if topic == "" {
683 return ErrInvalidTopic
684 }
685
686 request := &AlterPartitionReassignmentsRequest{
687 TimeoutMs: int32(60000),
688 Version: int16(0),
689 }
690
691 for i := range assignment {
692 request.AddBlock(topic, int32(i), assignment[i])
693 }
694
695 return ca.retryOnError(isRetriableControllerError, func() error {
696 b, err := ca.Controller()
697 if err != nil {
698 return err
699 }
700
701 errs := make([]error, 0)
702
703 rsp, err := b.AlterPartitionReassignments(request)
704
705 if err != nil {
706 errs = append(errs, err)
707 } else {
708 if rsp.ErrorCode > 0 {
709 errs = append(errs, rsp.ErrorCode)
710 }
711
712 for topic, topicErrors := range rsp.Errors {
713 for partition, partitionError := range topicErrors {
714 if !errors.Is(partitionError.errorCode, ErrNoError) {
715 errs = append(errs, fmt.Errorf("[%s-%d]: %w", topic, partition, partitionError.errorCode))
716 }
717 }
718 }
719 }
720
721 if len(errs) > 0 {
722 return Wrap(ErrReassignPartitions, errs...)
723 }
724
725 return nil
726 })
727}
728
729func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
730 if topic == "" {

Callers

nothing calls this directly

Calls 7

AddBlockMethod · 0.95
retryOnErrorMethod · 0.95
ControllerMethod · 0.95
WrapFunction · 0.85
IsMethod · 0.80
ErrorfMethod · 0.65

Tested by

no test coverage detected