MCPcopy
hub / github.com/segmentio/kafka-go / Balance

Method Balance

balancer.go:204–228  ·  balancer.go::ReferenceHash.Balance
(msg Message, partitions ...int)

Source from the content-addressed store, hash-verified

202}
203
204func (h *ReferenceHash) Balance(msg Message, partitions ...int) int {
205 if msg.Key == nil {
206 return h.rr.Balance(msg, partitions...)
207 }
208
209 hasher := h.Hasher
210 if hasher != nil {
211 h.lock.Lock()
212 defer h.lock.Unlock()
213 } else {
214 hasher = fnv1aPool.Get().(hash.Hash32)
215 defer fnv1aPool.Put(hasher)
216 }
217
218 hasher.Reset()
219 if _, err := hasher.Write(msg.Key); err != nil {
220 panic(err)
221 }
222
223 // uses the same algorithm as the Sarama's referenceHashPartitioner.
224 // note the type conversions here. if the uint32 hash code is not cast to
225 // an int32, we do not get the same result as sarama.
226 partition := (int32(hasher.Sum32()) & 0x7fffffff) % int32(len(partitions))
227 return int(partition)
228}
229
230type randomBalancer struct {
231 mock int // mocked return value, used for testing

Callers 1

Calls 5

GetMethod · 0.80
PutMethod · 0.80
BalanceMethod · 0.65
ResetMethod · 0.45
WriteMethod · 0.45

Tested by 1