(topics []string, attemptsRemaining int, deadline time.Time)
| 972 | } |
| 973 | |
| 974 | func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error { |
| 975 | pastDeadline := func(backoff time.Duration) bool { |
| 976 | if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) { |
| 977 | // we are past the deadline |
| 978 | return true |
| 979 | } |
| 980 | return false |
| 981 | } |
| 982 | retry := func(err error) error { |
| 983 | if attemptsRemaining > 0 { |
| 984 | backoff := computeMetadataBackoff(client.conf, attemptsRemaining) |
| 985 | if pastDeadline(backoff) { |
| 986 | Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout") |
| 987 | return err |
| 988 | } |
| 989 | if backoff > 0 { |
| 990 | time.Sleep(backoff) |
| 991 | } |
| 992 | |
| 993 | t := client.updateMetadataMs.Load() |
| 994 | if time.Since(time.UnixMilli(t)) < backoff { |
| 995 | return err |
| 996 | } |
| 997 | attemptsRemaining-- |
| 998 | Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) |
| 999 | |
| 1000 | return client.tryRefreshMetadata(topics, attemptsRemaining, deadline) |
| 1001 | } |
| 1002 | return err |
| 1003 | } |
| 1004 | |
| 1005 | broker := client.LeastLoadedBroker() |
| 1006 | brokerErrors := make([]error, 0) |
| 1007 | for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() { |
| 1008 | allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation |
| 1009 | if len(topics) > 0 { |
| 1010 | DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr) |
| 1011 | } else { |
| 1012 | allowAutoTopicCreation = false |
| 1013 | DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr) |
| 1014 | } |
| 1015 | |
| 1016 | req := NewMetadataRequest(client.conf.Version, topics) |
| 1017 | req.AllowAutoTopicCreation = allowAutoTopicCreation |
| 1018 | |
| 1019 | response, err := broker.GetMetadata(req) |
| 1020 | var kerror KError |
| 1021 | var packetEncodingError PacketEncodingError |
| 1022 | if err == nil { |
| 1023 | // When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924). |
| 1024 | if len(response.Brokers) == 0 { |
| 1025 | Logger.Printf("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr) |
| 1026 | _ = broker.Close() |
| 1027 | client.deregisterBroker(broker) |
| 1028 | continue |
| 1029 | } |
| 1030 | allKnownMetaData := len(topics) == 0 |
| 1031 | // valid response, use it |
no test coverage detected