| 141 | ) |
| 142 | |
| 143 | type client struct { |
| 144 | // updateMetadataMs stores the time at which metadata was lasted updated. |
| 145 | // Note: this accessed atomically so must be the first word in the struct |
| 146 | // as per golang/go#41970 |
| 147 | updateMetadataMs atomic.Int64 |
| 148 | |
| 149 | conf *Config |
| 150 | closer, closed chan none // for shutting down background metadata updater |
| 151 | |
| 152 | // the broker addresses given to us through the constructor are not guaranteed to be returned in |
| 153 | // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?) |
| 154 | // so we store them separately |
| 155 | seedBrokers []*Broker |
| 156 | deadSeeds []*Broker |
| 157 | |
| 158 | controllerID int32 // cluster controller broker id |
| 159 | brokers map[int32]*Broker // maps broker ids to brokers |
| 160 | metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata |
| 161 | metadataTopics map[string]none // topics that need to collect metadata |
| 162 | coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs |
| 163 | transactionCoordinators map[string]int32 // Maps transaction ids to coordinating broker IDs |
| 164 | |
| 165 | // If the number of partitions is large, we can get some churn calling cachedPartitions, |
| 166 | // so the result is cached. It is important to update this value whenever metadata is changed |
| 167 | cachedPartitionsResults map[string][maxPartitionIndex][]int32 |
| 168 | |
| 169 | lock sync.RWMutex // protects access to the maps that hold cluster state. |
| 170 | |
| 171 | metadataRefresh metadataRefresh |
| 172 | } |
| 173 | |
| 174 | // NewClient creates a new Client. It connects to one of the given broker addresses |
| 175 | // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot |
nothing calls this directly
no outgoing calls
no test coverage detected