()
| 829 | } |
| 830 | |
| 831 | func (pp *partitionProducer) dispatch() { |
| 832 | // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader` |
| 833 | // on the first message |
| 834 | pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition) |
| 835 | if pp.leader != nil { |
| 836 | pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) |
| 837 | pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight |
| 838 | pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} |
| 839 | } |
| 840 | |
| 841 | defer func() { |
| 842 | if pp.brokerProducer != nil { |
| 843 | pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) |
| 844 | } |
| 845 | }() |
| 846 | |
| 847 | for msg := range pp.input { |
| 848 | if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil { |
| 849 | select { |
| 850 | case <-pp.brokerProducer.abandoned: |
| 851 | // a message on the abandoned channel means that our current broker selection is out of date |
| 852 | Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) |
| 853 | pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer) |
| 854 | pp.brokerProducer = nil |
| 855 | time.Sleep(pp.parent.conf.Producer.Retry.Backoff) |
| 856 | default: |
| 857 | // producer connection is still open. |
| 858 | } |
| 859 | } |
| 860 | |
| 861 | if msg.retries > pp.highWatermark { |
| 862 | if err := pp.updateLeaderIfBrokerProducerIsNil(msg); err != nil { |
| 863 | continue |
| 864 | } |
| 865 | // a new, higher, retry level; handle it and then back off |
| 866 | pp.newHighWatermark(msg.retries) |
| 867 | pp.backoff(msg.retries) |
| 868 | } else if pp.highWatermark > 0 { |
| 869 | // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level |
| 870 | if msg.retries < pp.highWatermark { |
| 871 | // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin) |
| 872 | if msg.flags&fin == fin { |
| 873 | pp.retryState[msg.retries].expectChaser = false |
| 874 | pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected |
| 875 | } else { |
| 876 | pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg) |
| 877 | } |
| 878 | continue |
| 879 | } else if msg.flags&fin == fin { |
| 880 | // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set, |
| 881 | // meaning this retry level is done and we can go down (at least) one level and flush that |
| 882 | pp.retryState[pp.highWatermark].expectChaser = false |
| 883 | pp.flushRetryBuffers() |
| 884 | pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected |
| 885 | continue |
| 886 | } |
| 887 | } |
| 888 |
nothing calls this directly
no test coverage detected