MCPcopy
hub / github.com/IBM/sarama / newPartitionProducer

Method newPartitionProducer

async_producer.go:787–800  ·  view source on GitHub ↗
(topic string, partition int32)

Source from the content-addressed store, hash-verified

785}
786
787func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
788 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
789 pp := &partitionProducer{
790 parent: p,
791 topic: topic,
792 partition: partition,
793 input: input,
794
795 breaker: breaker.New(3, 1, 10*time.Second),
796 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
797 }
798 go withRecover(pp.dispatch)
799 return input
800}
801
802func (pp *partitionProducer) backoff(retries int) {
803 pp.parent.backoff(retries)

Callers 1

dispatchMethod · 0.80

Calls 1

withRecoverFunction · 0.85

Tested by

no test coverage detected