| 207 | } |
| 208 | |
| 209 | func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) { |
| 210 | if message.Key == nil { |
| 211 | return p.random.Partition(message, numPartitions) |
| 212 | } |
| 213 | bytes, err := message.Key.Encode() |
| 214 | if err != nil { |
| 215 | return -1, err |
| 216 | } |
| 217 | p.hasher.Reset() |
| 218 | _, err = p.hasher.Write(bytes) |
| 219 | if err != nil { |
| 220 | return -1, err |
| 221 | } |
| 222 | var partition int32 |
| 223 | // Turns out we were doing our absolute value in a subtly different way from the upstream |
| 224 | // implementation, but now we need to maintain backwards compat for people who started using |
| 225 | // the old version; if referenceAbs is set we are compatible with the reference java client |
| 226 | // but not past Sarama versions |
| 227 | if p.referenceAbs { |
| 228 | partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions |
| 229 | } else if p.hashUnsigned { |
| 230 | // librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible |
| 231 | // with librdkafka's `consistent` partitioning but not past Sarama versions |
| 232 | partition = int32(p.hasher.Sum32() % uint32(numPartitions)) |
| 233 | } else { |
| 234 | partition = int32(p.hasher.Sum32()) % numPartitions |
| 235 | if partition < 0 { |
| 236 | partition = -partition |
| 237 | } |
| 238 | } |
| 239 | return partition, nil |
| 240 | } |
| 241 | |
| 242 | func (p *hashPartitioner) RequiresConsistency() bool { |
| 243 | return true |