MCPcopy
hub / github.com/IBM/sarama / flushRetryBuffers

Method flushRetryBuffers

async_producer.go:928–959  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

926}
927
928func (pp *partitionProducer) flushRetryBuffers() {
929 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
930 for {
931 pp.highWatermark--
932
933 if pp.brokerProducer == nil {
934 if err := pp.updateLeader(); err != nil {
935 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
936 goto flushDone
937 }
938 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
939 }
940
941 for _, msg := range pp.retryState[pp.highWatermark].buf {
942 if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 && !msg.hasSequence {
943 msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
944 msg.hasSequence = true
945 }
946 pp.brokerProducer.input <- msg
947 }
948
949 flushDone:
950 pp.retryState[pp.highWatermark].buf = nil
951 if pp.retryState[pp.highWatermark].expectChaser {
952 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
953 break
954 } else if pp.highWatermark == 0 {
955 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
956 break
957 }
958 }
959}
960
961func (pp *partitionProducer) updateLeader() error {
962 return pp.breaker.Run(func() (err error) {

Calls 5

updateLeaderMethod · 0.95
returnErrorsMethod · 0.80
IDMethod · 0.80
PrintfMethod · 0.65