| 785 | } |
| 786 | |
| 787 | func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage { |
| 788 | input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) |
| 789 | pp := &partitionProducer{ |
| 790 | parent: p, |
| 791 | topic: topic, |
| 792 | partition: partition, |
| 793 | input: input, |
| 794 | |
| 795 | breaker: breaker.New(3, 1, 10*time.Second), |
| 796 | retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1), |
| 797 | } |
| 798 | go withRecover(pp.dispatch) |
| 799 | return input |
| 800 | } |
| 801 | |
| 802 | func (pp *partitionProducer) backoff(retries int) { |
| 803 | pp.parent.backoff(retries) |