MCPcopy
hub / github.com/segmentio/kafka-go / Balance

Method Balance

balancer.go:153–181  ·  balancer.go::Hash.Balance
(msg Message, partitions ...int)

Source from the content-addressed store, hash-verified

151}
152
153func (h *Hash) Balance(msg Message, partitions ...int) int {
154 if msg.Key == nil {
155 return h.rr.Balance(msg, partitions...)
156 }
157
158 hasher := h.Hasher
159 if hasher != nil {
160 h.lock.Lock()
161 defer h.lock.Unlock()
162 } else {
163 hasher = fnv1aPool.Get().(hash.Hash32)
164 defer fnv1aPool.Put(hasher)
165 }
166
167 hasher.Reset()
168 if _, err := hasher.Write(msg.Key); err != nil {
169 panic(err)
170 }
171
172 // uses same algorithm that Sarama's hashPartitioner uses
173 // note the type conversions here. if the uint32 hash code is not cast to
174 // an int32, we do not get the same result as sarama.
175 partition := int32(hasher.Sum32()) % int32(len(partitions))
176 if partition < 0 {
177 partition = -partition
178 }
179
180 return int(partition)
181}
182
183// ReferenceHash is a Balancer that uses the provided hash function to determine which
184// partition to route messages to. This ensures that messages with the same key

Callers 1

TestHashBalancerFunction · 0.95

Calls 5

GetMethod · 0.80
PutMethod · 0.80
BalanceMethod · 0.65
ResetMethod · 0.45
WriteMethod · 0.45

Tested by 1

TestHashBalancerFunction · 0.76