waitForReassignment polls ListPartitionReassignments until the broker stops reporting any in-progress reassignment for the topic, or the timeout fires. Kafka only returns partitions that are actively being reassigned, so an empty TopicStatus entry means the move is complete.
( ctx context.Context, admin sarama.ClusterAdmin, topic string, partitions []int32, interval, timeout time.Duration, )
| 194 | // Kafka only returns partitions that are actively being reassigned, so an |
| 195 | // empty TopicStatus entry means the move is complete. |
| 196 | func waitForReassignment( |
| 197 | ctx context.Context, |
| 198 | admin sarama.ClusterAdmin, |
| 199 | topic string, |
| 200 | partitions []int32, |
| 201 | interval, timeout time.Duration, |
| 202 | ) error { |
| 203 | deadline := time.Now().Add(timeout) |
| 204 | ticker := time.NewTicker(interval) |
| 205 | defer ticker.Stop() |
| 206 | |
| 207 | for { |
| 208 | status, err := admin.ListPartitionReassignments(topic, partitions) |
| 209 | if err != nil { |
| 210 | return fmt.Errorf("list partition reassignments: %w", err) |
| 211 | } |
| 212 | inProgress := status[topic] |
| 213 | if len(inProgress) == 0 { |
| 214 | return nil |
| 215 | } |
| 216 | log.Printf("%d partition(s) still reassigning", len(inProgress)) |
| 217 | for p, s := range inProgress { |
| 218 | log.Printf(" partition %d: adding=%v removing=%v current=%v", p, s.AddingReplicas, s.RemovingReplicas, s.Replicas) |
| 219 | } |
| 220 | if time.Now().After(deadline) { |
| 221 | return fmt.Errorf("timed out waiting for reassignment of %q to complete", topic) |
| 222 | } |
| 223 | select { |
| 224 | case <-ctx.Done(): |
| 225 | return ctx.Err() |
| 226 | case <-ticker.C: |
| 227 | } |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | func partitionIDs(assignment [][]int32) []int32 { |
| 232 | ids := make([]int32, len(assignment)) |