| 296 | } |
| 297 | |
| 298 | func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int) { |
| 299 | // NOTE: the murmur2 balancers in java and librdkafka treat a nil key as |
| 300 | // non-existent while treating an empty slice as a defined value. |
| 301 | if msg.Key == nil && !b.Consistent { |
| 302 | return b.random.Balance(msg, partitions...) |
| 303 | } |
| 304 | |
| 305 | idx := (murmur2(msg.Key) & 0x7fffffff) % uint32(len(partitions)) |
| 306 | return partitions[idx] |
| 307 | } |
| 308 | |
| 309 | // Go port of the Java library's murmur2 function. |
| 310 | // https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L353 |