| 202 | } |
| 203 | |
| 204 | func (h *ReferenceHash) Balance(msg Message, partitions ...int) int { |
| 205 | if msg.Key == nil { |
| 206 | return h.rr.Balance(msg, partitions...) |
| 207 | } |
| 208 | |
| 209 | hasher := h.Hasher |
| 210 | if hasher != nil { |
| 211 | h.lock.Lock() |
| 212 | defer h.lock.Unlock() |
| 213 | } else { |
| 214 | hasher = fnv1aPool.Get().(hash.Hash32) |
| 215 | defer fnv1aPool.Put(hasher) |
| 216 | } |
| 217 | |
| 218 | hasher.Reset() |
| 219 | if _, err := hasher.Write(msg.Key); err != nil { |
| 220 | panic(err) |
| 221 | } |
| 222 | |
| 223 | // uses the same algorithm as the Sarama's referenceHashPartitioner. |
| 224 | // note the type conversions here. if the uint32 hash code is not cast to |
| 225 | // an int32, we do not get the same result as sarama. |
| 226 | partition := (int32(hasher.Sum32()) & 0x7fffffff) % int32(len(partitions)) |
| 227 | return int(partition) |
| 228 | } |
| 229 | |
| 230 | type randomBalancer struct { |
| 231 | mock int // mocked return value, used for testing |