MCPcopy
hub / github.com/IBM/sarama / partitionMessage

Method partitionMessage

async_producer.go:720–759  ·  view source on GitHub ↗
(msg *ProducerMessage)

Source from the content-addressed store, hash-verified

718}
719
720func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
721 var partitions []int32
722
723 err := tp.breaker.Run(func() (err error) {
724 requiresConsistency := false
725 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
726 requiresConsistency = ep.MessageRequiresConsistency(msg)
727 } else {
728 requiresConsistency = tp.partitioner.RequiresConsistency()
729 }
730
731 if requiresConsistency {
732 partitions, err = tp.parent.client.Partitions(msg.Topic)
733 } else {
734 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
735 }
736 return
737 })
738 if err != nil {
739 return err
740 }
741
742 numPartitions := int32(len(partitions))
743
744 if numPartitions == 0 {
745 return ErrLeaderNotAvailable
746 }
747
748 choice, err := tp.partitioner.Partition(msg, numPartitions)
749
750 if err != nil {
751 return err
752 } else if choice < 0 || choice >= numPartitions {
753 return ErrInvalidPartition
754 }
755
756 msg.Partition = partitions[choice]
757
758 return nil
759}
760
761// one per partition per topic
762// dispatches messages to the appropriate broker

Callers 1

dispatchMethod · 0.95

Calls 6

RunMethod · 0.80
RequiresConsistencyMethod · 0.65
PartitionsMethod · 0.65
WritablePartitionsMethod · 0.65
PartitionMethod · 0.65

Tested by

no test coverage detected