()
| 695 | } |
| 696 | |
| 697 | func (tp *topicProducer) dispatch() { |
| 698 | for msg := range tp.input { |
| 699 | if msg.retries == 0 { |
| 700 | if err := tp.partitionMessage(msg); err != nil { |
| 701 | tp.parent.returnError(msg, err) |
| 702 | continue |
| 703 | } |
| 704 | } |
| 705 | |
| 706 | handler := tp.handlers[msg.Partition] |
| 707 | if handler == nil { |
| 708 | handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition) |
| 709 | tp.handlers[msg.Partition] = handler |
| 710 | } |
| 711 | |
| 712 | handler <- msg |
| 713 | } |
| 714 | |
| 715 | for _, handler := range tp.handlers { |
| 716 | close(handler) |
| 717 | } |
| 718 | } |
| 719 | |
| 720 | func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { |
| 721 | var partitions []int32 |
nothing calls this directly
no test coverage detected