-------------------------------------------------------------------- NewBalanceStrategyRange returns a range balance strategy, which is the default and assigns partitions as ranges to consumer group members. This follows the same logic as https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/
()
| 124 | // M1: {T1: [0, 1, 2], T2: [0, 1, 2]} |
| 125 | // M2: {T1: [3, 4, 5], T2: [3, 4, 5]} |
| 126 | func NewBalanceStrategyRange() BalanceStrategy { |
| 127 | return &balanceStrategy{ |
| 128 | name: RangeBalanceStrategyName, |
| 129 | coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { |
| 130 | partitionsPerConsumer := len(partitions) / len(memberIDs) |
| 131 | consumersWithExtraPartition := len(partitions) % len(memberIDs) |
| 132 | |
| 133 | sort.Strings(memberIDs) |
| 134 | |
| 135 | for i, memberID := range memberIDs { |
| 136 | min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i))) |
| 137 | extra := 0 |
| 138 | if i < consumersWithExtraPartition { |
| 139 | extra = 1 |
| 140 | } |
| 141 | max := min + partitionsPerConsumer + extra |
| 142 | plan.Add(memberID, topic, partitions[min:max]...) |
| 143 | } |
| 144 | }, |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | // Deprecated: use NewBalanceStrategyRange to avoid data race issue |
| 149 | var BalanceStrategyRange = NewBalanceStrategyRange() |