MCPcopy
hub / github.com/IBM/sarama / updateLeader

Method updateLeader

async_producer.go:961–977  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

959}
960
961func (pp *partitionProducer) updateLeader() error {
962 return pp.breaker.Run(func() (err error) {
963 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
964 return err
965 }
966
967 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
968 return err
969 }
970
971 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
972 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
973 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
974
975 return nil
976 })
977}
978
979// one per broker; also constructs an associated flusher
980func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {

Callers 2

flushRetryBuffersMethod · 0.95

Calls 5

RunMethod · 0.80
getBrokerProducerMethod · 0.80
RefreshMetadataMethod · 0.65
LeaderMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected