MCPcopy
hub / github.com/IBM/sarama / main

Function main

examples/alter_partition_reassignments/main.go:49–116  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

47}
48
49func main() {
50 if verbose {
51 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
52 }
53
54 kafkaVersion, err := sarama.ParseKafkaVersion(version)
55 if err != nil {
56 log.Fatalf("parse kafka version: %v", err)
57 }
58
59 config := sarama.NewConfig()
60 config.Version = kafkaVersion
61 config.ClientID = "sarama-alter-partition-reassignments"
62
63 admin, err := sarama.NewClusterAdmin(strings.Split(brokers, ","), config)
64 if err != nil {
65 log.Fatalf("create cluster admin: %v", err)
66 }
67 defer func() {
68 if err := admin.Close(); err != nil {
69 log.Printf("close cluster admin: %v", err)
70 }
71 }()
72
73 ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
74 defer cancel()
75
76 current, err := describeTopic(admin, topic)
77 if err != nil {
78 log.Fatalf("describe topic %q: %v", topic, err)
79 }
80 log.Printf("current assignment for %q: %s", topic, formatAssignment(current))
81
82 clusterBrokers, _, err := admin.DescribeCluster()
83 if err != nil {
84 log.Fatalf("describe cluster: %v", err)
85 }
86 if len(clusterBrokers) < replicationFactor {
87 log.Fatalf("cluster has %d brokers, cannot satisfy replication factor %d", len(clusterBrokers), replicationFactor)
88 }
89 brokerIDs := make([]int32, 0, len(clusterBrokers))
90 for _, b := range clusterBrokers {
91 brokerIDs = append(brokerIDs, b.ID())
92 }
93 slices.Sort(brokerIDs)
94
95 target, changed := buildTargetAssignment(current, brokerIDs, replicationFactor)
96 if !changed {
97 log.Printf("topic %q is already at replication factor %d, nothing to do", topic, replicationFactor)
98 return
99 }
100 log.Printf("target assignment for %q: %s", topic, formatAssignment(target))
101
102 if err := admin.AlterPartitionReassignments(topic, target); err != nil {
103 log.Fatalf("alter partition reassignments: %v", err)
104 }
105 log.Printf("submitted reassignment for %q, waiting for it to complete (timeout %s)", topic, pollTimeout)
106

Callers

nothing calls this directly

Calls 14

CloseMethod · 0.95
DescribeClusterMethod · 0.95
ParseKafkaVersionFunction · 0.92
NewConfigFunction · 0.92
NewClusterAdminFunction · 0.92
describeTopicFunction · 0.85
formatAssignmentFunction · 0.85
buildTargetAssignmentFunction · 0.85
waitForReassignmentFunction · 0.85
partitionIDsFunction · 0.85
FatalfMethod · 0.80

Tested by

no test coverage detected