MCPcopy
hub / github.com/IBM/sarama / newHighWatermark

Method newHighWatermark

async_producer.go:912–926  ·  view source on GitHub ↗
(hwm int)

Source from the content-addressed store, hash-verified

910}
911
912func (pp *partitionProducer) newHighWatermark(hwm int) {
913 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
914 pp.highWatermark = hwm
915
916 // send off a fin so that we know when everything "in between" has made it
917 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
918 pp.retryState[pp.highWatermark].expectChaser = true
919 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
920 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
921
922 // a new HWM means that our current broker selection is out of date
923 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
924 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
925 pp.brokerProducer = nil
926}
927
928func (pp *partitionProducer) flushRetryBuffers() {
929 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)

Callers 1

dispatchMethod · 0.95

Calls 4

IDMethod · 0.80
unrefBrokerProducerMethod · 0.80
PrintfMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected