MCPcopy
hub / github.com/IBM/sarama / updateMetadata

Method updateMetadata

client.go:1080–1160  ·  view source on GitHub ↗

if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable

(data *MetadataResponse, allKnownMetaData bool)

Source from the content-addressed store, hash-verified

1078
1079// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
1080func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (bool, error) {
1081 if client.Closed() {
1082 return false, nil
1083 }
1084
1085 client.lock.Lock()
1086 defer client.lock.Unlock()
1087
1088 // Check health of existing brokers, including seed brokers, dead
1089 // seed brokers, and registered brokers.
1090 // - if error occurred on broker's tcp socket, close the tcp
1091 // connection.
1092 // - if it's seed broker or dead seed broker, remove it from
1093 // the list.
1094 client.checkBrokersHealth()
1095
1096 // For all the brokers we received:
1097 // - if it is a new ID, save it
1098 // - if it is an existing ID, but the address we have is stale, discard the old one and save it
1099 // - if some brokers is not exist in it, remove old broker
1100 // - otherwise ignore it, replacing our existing one would just bounce the connection
1101 client.updateBroker(data.Brokers)
1102
1103 client.controllerID = data.ControllerID
1104
1105 if allKnownMetaData {
1106 client.metadata = make(map[string]map[int32]*PartitionMetadata)
1107 client.metadataTopics = make(map[string]none)
1108 client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
1109 }
1110 topicErrs := make(refreshError)
1111 retry := false
1112 for _, topic := range data.Topics {
1113 // topics must be added firstly to `metadataTopics` to guarantee that all
1114 // requested topics must be recorded to keep them trackable for periodically
1115 // metadata refresh.
1116 if _, exists := client.metadataTopics[topic.Name]; !exists {
1117 client.metadataTopics[topic.Name] = none{}
1118 }
1119 delete(client.metadata, topic.Name)
1120 delete(client.cachedPartitionsResults, topic.Name)
1121
1122 switch topic.Err {
1123 case ErrNoError:
1124 // no-op
1125 case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
1126 topicErrs.addError(topic.Name, topic.Err)
1127 continue
1128 case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
1129 topicErrs.addError(topic.Name, topic.Err)
1130 retry = true
1131 continue
1132 case ErrLeaderNotAvailable: // retry, but store partial partition results
1133 topicErrs.addError(topic.Name, topic.Err)
1134 retry = true
1135 default: // don't retry, don't store partial results
1136 Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
1137 topicErrs.addError(topic.Name, topic.Err)

Callers 1

tryRefreshMetadataMethod · 0.95

Calls 7

ClosedMethod · 0.95
checkBrokersHealthMethod · 0.95
updateBrokerMethod · 0.95
setPartitionCacheMethod · 0.95
addErrorMethod · 0.80
IsMethod · 0.80
PrintfMethod · 0.65

Tested by

no test coverage detected