(msg *ProducerMessage)
| 718 | } |
| 719 | |
| 720 | func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { |
| 721 | var partitions []int32 |
| 722 | |
| 723 | err := tp.breaker.Run(func() (err error) { |
| 724 | requiresConsistency := false |
| 725 | if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok { |
| 726 | requiresConsistency = ep.MessageRequiresConsistency(msg) |
| 727 | } else { |
| 728 | requiresConsistency = tp.partitioner.RequiresConsistency() |
| 729 | } |
| 730 | |
| 731 | if requiresConsistency { |
| 732 | partitions, err = tp.parent.client.Partitions(msg.Topic) |
| 733 | } else { |
| 734 | partitions, err = tp.parent.client.WritablePartitions(msg.Topic) |
| 735 | } |
| 736 | return |
| 737 | }) |
| 738 | if err != nil { |
| 739 | return err |
| 740 | } |
| 741 | |
| 742 | numPartitions := int32(len(partitions)) |
| 743 | |
| 744 | if numPartitions == 0 { |
| 745 | return ErrLeaderNotAvailable |
| 746 | } |
| 747 | |
| 748 | choice, err := tp.partitioner.Partition(msg, numPartitions) |
| 749 | |
| 750 | if err != nil { |
| 751 | return err |
| 752 | } else if choice < 0 || choice >= numPartitions { |
| 753 | return ErrInvalidPartition |
| 754 | } |
| 755 | |
| 756 | msg.Partition = partitions[choice] |
| 757 | |
| 758 | return nil |
| 759 | } |
| 760 | |
| 761 | // one per partition per topic |
| 762 | // dispatches messages to the appropriate broker |
no test coverage detected