()
| 47 | } |
| 48 | |
| 49 | func main() { |
| 50 | if verbose { |
| 51 | sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) |
| 52 | } |
| 53 | |
| 54 | kafkaVersion, err := sarama.ParseKafkaVersion(version) |
| 55 | if err != nil { |
| 56 | log.Fatalf("parse kafka version: %v", err) |
| 57 | } |
| 58 | |
| 59 | config := sarama.NewConfig() |
| 60 | config.Version = kafkaVersion |
| 61 | config.ClientID = "sarama-alter-partition-reassignments" |
| 62 | |
| 63 | admin, err := sarama.NewClusterAdmin(strings.Split(brokers, ","), config) |
| 64 | if err != nil { |
| 65 | log.Fatalf("create cluster admin: %v", err) |
| 66 | } |
| 67 | defer func() { |
| 68 | if err := admin.Close(); err != nil { |
| 69 | log.Printf("close cluster admin: %v", err) |
| 70 | } |
| 71 | }() |
| 72 | |
| 73 | ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) |
| 74 | defer cancel() |
| 75 | |
| 76 | current, err := describeTopic(admin, topic) |
| 77 | if err != nil { |
| 78 | log.Fatalf("describe topic %q: %v", topic, err) |
| 79 | } |
| 80 | log.Printf("current assignment for %q: %s", topic, formatAssignment(current)) |
| 81 | |
| 82 | clusterBrokers, _, err := admin.DescribeCluster() |
| 83 | if err != nil { |
| 84 | log.Fatalf("describe cluster: %v", err) |
| 85 | } |
| 86 | if len(clusterBrokers) < replicationFactor { |
| 87 | log.Fatalf("cluster has %d brokers, cannot satisfy replication factor %d", len(clusterBrokers), replicationFactor) |
| 88 | } |
| 89 | brokerIDs := make([]int32, 0, len(clusterBrokers)) |
| 90 | for _, b := range clusterBrokers { |
| 91 | brokerIDs = append(brokerIDs, b.ID()) |
| 92 | } |
| 93 | slices.Sort(brokerIDs) |
| 94 | |
| 95 | target, changed := buildTargetAssignment(current, brokerIDs, replicationFactor) |
| 96 | if !changed { |
| 97 | log.Printf("topic %q is already at replication factor %d, nothing to do", topic, replicationFactor) |
| 98 | return |
| 99 | } |
| 100 | log.Printf("target assignment for %q: %s", topic, formatAssignment(target)) |
| 101 | |
| 102 | if err := admin.AlterPartitionReassignments(topic, target); err != nil { |
| 103 | log.Fatalf("alter partition reassignments: %v", err) |
| 104 | } |
| 105 | log.Printf("submitted reassignment for %q, waiting for it to complete (timeout %s)", topic, pollTimeout) |
| 106 |
nothing calls this directly
no test coverage detected