MCPcopy
hub / github.com/IBM/sarama / tryRefreshMetadata

Method tryRefreshMetadata

client.go:974–1077  ·  view source on GitHub ↗
(topics []string, attemptsRemaining int, deadline time.Time)

Source from the content-addressed store, hash-verified

972}
973
974func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
975 pastDeadline := func(backoff time.Duration) bool {
976 if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
977 // we are past the deadline
978 return true
979 }
980 return false
981 }
982 retry := func(err error) error {
983 if attemptsRemaining > 0 {
984 backoff := computeMetadataBackoff(client.conf, attemptsRemaining)
985 if pastDeadline(backoff) {
986 Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
987 return err
988 }
989 if backoff > 0 {
990 time.Sleep(backoff)
991 }
992
993 t := client.updateMetadataMs.Load()
994 if time.Since(time.UnixMilli(t)) < backoff {
995 return err
996 }
997 attemptsRemaining--
998 Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
999
1000 return client.tryRefreshMetadata(topics, attemptsRemaining, deadline)
1001 }
1002 return err
1003 }
1004
1005 broker := client.LeastLoadedBroker()
1006 brokerErrors := make([]error, 0)
1007 for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
1008 allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
1009 if len(topics) > 0 {
1010 DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
1011 } else {
1012 allowAutoTopicCreation = false
1013 DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
1014 }
1015
1016 req := NewMetadataRequest(client.conf.Version, topics)
1017 req.AllowAutoTopicCreation = allowAutoTopicCreation
1018
1019 response, err := broker.GetMetadata(req)
1020 var kerror KError
1021 var packetEncodingError PacketEncodingError
1022 if err == nil {
1023 // When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924).
1024 if len(response.Brokers) == 0 {
1025 Logger.Printf("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr)
1026 _ = broker.Close()
1027 client.deregisterBroker(broker)
1028 continue
1029 }
1030 allKnownMetaData := len(topics) == 0
1031 // valid response, use it

Callers 1

NewClientFunction · 0.95

Calls 14

LeastLoadedBrokerMethod · 0.95
deregisterBrokerMethod · 0.95
updateMetadataMethod · 0.95
resurrectDeadBrokersMethod · 0.95
computeMetadataBackoffFunction · 0.85
NewMetadataRequestFunction · 0.85
WrapFunction · 0.85
GetMetadataMethod · 0.80
IDMethod · 0.80
IsMethod · 0.80
PrintlnMethod · 0.65
PrintfMethod · 0.65

Tested by

no test coverage detected