MCPcopy
hub / github.com/IBM/sarama / NewBalanceStrategyRange

Function NewBalanceStrategyRange

balance_strategy.go:126–146  ·  view source on GitHub ↗

-------------------------------------------------------------------- NewBalanceStrategyRange returns a range balance strategy, which is the default and assigns partitions as ranges to consumer group members. This follows the same logic as https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/

()

Source from the content-addressed store, hash-verified

124// M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
125// M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
126func NewBalanceStrategyRange() BalanceStrategy {
127 return &balanceStrategy{
128 name: RangeBalanceStrategyName,
129 coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
130 partitionsPerConsumer := len(partitions) / len(memberIDs)
131 consumersWithExtraPartition := len(partitions) % len(memberIDs)
132
133 sort.Strings(memberIDs)
134
135 for i, memberID := range memberIDs {
136 min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i)))
137 extra := 0
138 if i < consumersWithExtraPartition {
139 extra = 1
140 }
141 max := min + partitionsPerConsumer + extra
142 plan.Add(memberID, topic, partitions[min:max]...)
143 }
144 },
145 }
146}
147
148// Deprecated: use NewBalanceStrategyRange to avoid data race issue
149var BalanceStrategyRange = NewBalanceStrategyRange()

Callers 8

mainFunction · 0.92
mainFunction · 0.92
TestBalanceStrategyRangeFunction · 0.85
TestSubscriptionMetadataFunction · 0.85
NewConfigFunction · 0.85
newTestStatefulStrategyFunction · 0.85

Calls 1

AddMethod · 0.45

Tested by 4

TestBalanceStrategyRangeFunction · 0.68
TestSubscriptionMetadataFunction · 0.68
newTestStatefulStrategyFunction · 0.68