MCPcopy
hub / github.com/IBM/sarama / dispatch

Method dispatch

async_producer.go:831–910  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

829}
830
831func (pp *partitionProducer) dispatch() {
832 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
833 // on the first message
834 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
835 if pp.leader != nil {
836 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
837 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
838 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
839 }
840
841 defer func() {
842 if pp.brokerProducer != nil {
843 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
844 }
845 }()
846
847 for msg := range pp.input {
848 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
849 select {
850 case <-pp.brokerProducer.abandoned:
851 // a message on the abandoned channel means that our current broker selection is out of date
852 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
853 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
854 pp.brokerProducer = nil
855 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
856 default:
857 // producer connection is still open.
858 }
859 }
860
861 if msg.retries > pp.highWatermark {
862 if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil {
863 continue
864 }
865 // a new, higher, retry level; handle it and then back off
866 pp.newHighWatermark(msg.retries)
867 pp.backoff(msg.retries)
868 } else if pp.highWatermark > 0 {
869 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
870 if msg.retries < pp.highWatermark {
871 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
872 if msg.flags&fin == fin {
873 pp.retryState[msg.retries].expectChaser = false
874 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
875 } else {
876 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
877 }
878 continue
879 } else if msg.flags&fin == fin {
880 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
881 // meaning this retry level is done and we can go down (at least) one level and flush that
882 pp.retryState[pp.highWatermark].expectChaser = false
883 pp.flushRetryBuffers()
884 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
885 continue
886 }
887 }
888

Callers

nothing calls this directly

Calls 14

newHighWatermarkMethod · 0.95
backoffMethod · 0.95
flushRetryBuffersMethod · 0.95
getBrokerProducerMethod · 0.80
unrefBrokerProducerMethod · 0.80
IDMethod · 0.80
LeaderMethod · 0.65
PrintfMethod · 0.65
DoneMethod · 0.65

Tested by

no test coverage detected