describeTopic returns the current replica set for every partition, indexed by partition ID. The inner slice mirrors what Kafka stores: an ordered list of broker IDs where the first entry is the preferred leader and the rest are followers.
(admin sarama.ClusterAdmin, name string)
| 120 | // broker IDs where the first entry is the preferred leader and the rest are |
| 121 | // followers. |
| 122 | func describeTopic(admin sarama.ClusterAdmin, name string) ([][]int32, error) { |
| 123 | meta, err := admin.DescribeTopics([]string{name}) |
| 124 | if err != nil { |
| 125 | return nil, err |
| 126 | } |
| 127 | if len(meta) == 0 { |
| 128 | return nil, fmt.Errorf("topic %q not found", name) |
| 129 | } |
| 130 | t := meta[0] |
| 131 | if t.Err != sarama.ErrNoError { |
| 132 | return nil, t.Err |
| 133 | } |
| 134 | assignment := make([][]int32, len(t.Partitions)) |
| 135 | for _, p := range t.Partitions { |
| 136 | if int(p.ID) >= len(assignment) { |
| 137 | return nil, fmt.Errorf("unexpected partition id %d for topic %q with %d partitions", p.ID, name, len(t.Partitions)) |
| 138 | } |
| 139 | assignment[p.ID] = slices.Clone(p.Replicas) |
| 140 | } |
| 141 | return assignment, nil |
| 142 | } |
| 143 | |
| 144 | // buildTargetAssignment grows each partition's replica list to targetRF by |
| 145 | // appending the least-used broker IDs not already in the partition's list. |
no test coverage detected