(msg *ProducerMessage)
| 817 | } |
| 818 | |
| 819 | func (pp *partitionProducer) updateLeaderIfBrokerProducerIsNil(msg *ProducerMessage) error { |
| 820 | if pp.brokerProducer == nil { |
| 821 | if err := pp.updateLeader(); err != nil { |
| 822 | pp.parent.returnError(msg, err) |
| 823 | pp.backoff(msg.retries) |
| 824 | return err |
| 825 | } |
| 826 | Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
| 827 | } |
| 828 | return nil |
| 829 | } |
| 830 | |
| 831 | func (pp *partitionProducer) dispatch() { |
| 832 | // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader` |
no test coverage detected