MCPcopy
hub / github.com/IBM/sarama / Partition

Method Partition

partitioner.go:209–240  ·  view source on GitHub ↗
(message *ProducerMessage, numPartitions int32)

Source from the content-addressed store, hash-verified

207}
208
209func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
210 if message.Key == nil {
211 return p.random.Partition(message, numPartitions)
212 }
213 bytes, err := message.Key.Encode()
214 if err != nil {
215 return -1, err
216 }
217 p.hasher.Reset()
218 _, err = p.hasher.Write(bytes)
219 if err != nil {
220 return -1, err
221 }
222 var partition int32
223 // Turns out we were doing our absolute value in a subtly different way from the upstream
224 // implementation, but now we need to maintain backwards compat for people who started using
225 // the old version; if referenceAbs is set we are compatible with the reference java client
226 // but not past Sarama versions
227 if p.referenceAbs {
228 partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
229 } else if p.hashUnsigned {
230 // librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible
231 // with librdkafka's `consistent` partitioning but not past Sarama versions
232 partition = int32(p.hasher.Sum32() % uint32(numPartitions))
233 } else {
234 partition = int32(p.hasher.Sum32()) % numPartitions
235 if partition < 0 {
236 partition = -partition
237 }
238 }
239 return partition, nil
240}
241
242func (p *hashPartitioner) RequiresConsistency() bool {
243 return true

Callers

nothing calls this directly

Calls 3

WriteMethod · 0.80
PartitionMethod · 0.65
EncodeMethod · 0.65

Tested by

no test coverage detected