NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash of the encoded bytes of the message key modulus the number of partitions. This is compatible with librdkafka's `consistent_random` partitioner
(topic string)
| 198 | // of the encoded bytes of the message key modulus the number of partitions. This is compatible with |
| 199 | // librdkafka's `consistent_random` partitioner |
| 200 | func NewConsistentCRCHashPartitioner(topic string) Partitioner { |
| 201 | p := new(hashPartitioner) |
| 202 | p.random = NewRandomPartitioner(topic) |
| 203 | p.hasher = crc32.NewIEEE() |
| 204 | p.referenceAbs = false |
| 205 | p.hashUnsigned = true |
| 206 | return p |
| 207 | } |
| 208 | |
| 209 | func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) { |
| 210 | if message.Key == nil { |