if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
(data *MetadataResponse, allKnownMetaData bool)
| 1078 | |
| 1079 | // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable |
| 1080 | func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (bool, error) { |
| 1081 | if client.Closed() { |
| 1082 | return false, nil |
| 1083 | } |
| 1084 | |
| 1085 | client.lock.Lock() |
| 1086 | defer client.lock.Unlock() |
| 1087 | |
| 1088 | // Check health of existing brokers, including seed brokers, dead |
| 1089 | // seed brokers, and registered brokers. |
| 1090 | // - if error occurred on broker's tcp socket, close the tcp |
| 1091 | // connection. |
| 1092 | // - if it's seed broker or dead seed broker, remove it from |
| 1093 | // the list. |
| 1094 | client.checkBrokersHealth() |
| 1095 | |
| 1096 | // For all the brokers we received: |
| 1097 | // - if it is a new ID, save it |
| 1098 | // - if it is an existing ID, but the address we have is stale, discard the old one and save it |
| 1099 | // - if some brokers is not exist in it, remove old broker |
| 1100 | // - otherwise ignore it, replacing our existing one would just bounce the connection |
| 1101 | client.updateBroker(data.Brokers) |
| 1102 | |
| 1103 | client.controllerID = data.ControllerID |
| 1104 | |
| 1105 | if allKnownMetaData { |
| 1106 | client.metadata = make(map[string]map[int32]*PartitionMetadata) |
| 1107 | client.metadataTopics = make(map[string]none) |
| 1108 | client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32) |
| 1109 | } |
| 1110 | topicErrs := make(refreshError) |
| 1111 | retry := false |
| 1112 | for _, topic := range data.Topics { |
| 1113 | // topics must be added firstly to `metadataTopics` to guarantee that all |
| 1114 | // requested topics must be recorded to keep them trackable for periodically |
| 1115 | // metadata refresh. |
| 1116 | if _, exists := client.metadataTopics[topic.Name]; !exists { |
| 1117 | client.metadataTopics[topic.Name] = none{} |
| 1118 | } |
| 1119 | delete(client.metadata, topic.Name) |
| 1120 | delete(client.cachedPartitionsResults, topic.Name) |
| 1121 | |
| 1122 | switch topic.Err { |
| 1123 | case ErrNoError: |
| 1124 | // no-op |
| 1125 | case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results |
| 1126 | topicErrs.addError(topic.Name, topic.Err) |
| 1127 | continue |
| 1128 | case ErrUnknownTopicOrPartition: // retry, do not store partial partition results |
| 1129 | topicErrs.addError(topic.Name, topic.Err) |
| 1130 | retry = true |
| 1131 | continue |
| 1132 | case ErrLeaderNotAvailable: // retry, but store partial partition results |
| 1133 | topicErrs.addError(topic.Name, topic.Err) |
| 1134 | retry = true |
| 1135 | default: // don't retry, don't store partial results |
| 1136 | Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err) |
| 1137 | topicErrs.addError(topic.Name, topic.Err) |
no test coverage detected