MCPcopy
hub / github.com/IBM/sarama / waitForReassignment

Function waitForReassignment

examples/alter_partition_reassignments/main.go:196–229  ·  view source on GitHub ↗

waitForReassignment polls ListPartitionReassignments until the broker stops reporting any in-progress reassignment for the topic, or the timeout fires. Kafka only returns partitions that are actively being reassigned, so an empty TopicStatus entry means the move is complete.

(
	ctx context.Context,
	admin sarama.ClusterAdmin,
	topic string,
	partitions []int32,
	interval, timeout time.Duration,
)

Source from the content-addressed store, hash-verified

194// Kafka only returns partitions that are actively being reassigned, so an
195// empty TopicStatus entry means the move is complete.
196func waitForReassignment(
197 ctx context.Context,
198 admin sarama.ClusterAdmin,
199 topic string,
200 partitions []int32,
201 interval, timeout time.Duration,
202) error {
203 deadline := time.Now().Add(timeout)
204 ticker := time.NewTicker(interval)
205 defer ticker.Stop()
206
207 for {
208 status, err := admin.ListPartitionReassignments(topic, partitions)
209 if err != nil {
210 return fmt.Errorf("list partition reassignments: %w", err)
211 }
212 inProgress := status[topic]
213 if len(inProgress) == 0 {
214 return nil
215 }
216 log.Printf("%d partition(s) still reassigning", len(inProgress))
217 for p, s := range inProgress {
218 log.Printf(" partition %d: adding=%v removing=%v current=%v", p, s.AddingReplicas, s.RemovingReplicas, s.Replicas)
219 }
220 if time.Now().After(deadline) {
221 return fmt.Errorf("timed out waiting for reassignment of %q to complete", topic)
222 }
223 select {
224 case <-ctx.Done():
225 return ctx.Err()
226 case <-ticker.C:
227 }
228 }
229}
230
231func partitionIDs(assignment [][]int32) []int32 {
232 ids := make([]int32, len(assignment))

Callers 1

mainFunction · 0.85

Calls 6

StopMethod · 0.80
ErrorfMethod · 0.65
PrintfMethod · 0.65
DoneMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected