()
| 959 | } |
| 960 | |
| 961 | func (pp *partitionProducer) updateLeader() error { |
| 962 | return pp.breaker.Run(func() (err error) { |
| 963 | if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil { |
| 964 | return err |
| 965 | } |
| 966 | |
| 967 | if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil { |
| 968 | return err |
| 969 | } |
| 970 | |
| 971 | pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader) |
| 972 | pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight |
| 973 | pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn} |
| 974 | |
| 975 | return nil |
| 976 | }) |
| 977 | } |
| 978 | |
| 979 | // one per broker; also constructs an associated flusher |
| 980 | func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { |
no test coverage detected