(hwm int)
| 910 | } |
| 911 | |
| 912 | func (pp *partitionProducer) newHighWatermark(hwm int) { |
| 913 | Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm) |
| 914 | pp.highWatermark = hwm |
| 915 | |
| 916 | // send off a fin so that we know when everything "in between" has made it |
| 917 | // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) |
| 918 | pp.retryState[pp.highWatermark].expectChaser = true |
| 919 | pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight |
| 920 | pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1} |
| 921 | |
| 922 | // a new HWM means that our current broker selection is out of date |
| 923 | Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
| 924 | pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) |
| 925 | pp.brokerProducer = nil |
| 926 | } |
| 927 | |
| 928 | func (pp *partitionProducer) flushRetryBuffers() { |
| 929 | Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark) |
no test coverage detected