| 926 | } |
| 927 | |
| 928 | func (pp *partitionProducer) flushRetryBuffers() { |
| 929 | Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark) |
| 930 | for { |
| 931 | pp.highWatermark-- |
| 932 | |
| 933 | if pp.brokerProducer == nil { |
| 934 | if err := pp.updateLeader(); err != nil { |
| 935 | pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err) |
| 936 | goto flushDone |
| 937 | } |
| 938 | Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
| 939 | } |
| 940 | |
| 941 | for _, msg := range pp.retryState[pp.highWatermark].buf { |
| 942 | if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 && !msg.hasSequence { |
| 943 | msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) |
| 944 | msg.hasSequence = true |
| 945 | } |
| 946 | pp.brokerProducer.input <- msg |
| 947 | } |
| 948 | |
| 949 | flushDone: |
| 950 | pp.retryState[pp.highWatermark].buf = nil |
| 951 | if pp.retryState[pp.highWatermark].expectChaser { |
| 952 | Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark) |
| 953 | break |
| 954 | } else if pp.highWatermark == 0 { |
| 955 | Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition) |
| 956 | break |
| 957 | } |
| 958 | } |
| 959 | } |
| 960 | |
| 961 | func (pp *partitionProducer) updateLeader() error { |
| 962 | return pp.breaker.Run(func() (err error) { |