NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.
(addrs []string, conf *Config)
| 175 | // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot |
| 176 | // be retrieved from any of the given broker addresses, the client is not created. |
| 177 | func NewClient(addrs []string, conf *Config) (Client, error) { |
| 178 | DebugLogger.Println("Initializing new client") |
| 179 | |
| 180 | if conf == nil { |
| 181 | conf = NewConfig() |
| 182 | } |
| 183 | |
| 184 | if err := conf.Validate(); err != nil { |
| 185 | return nil, err |
| 186 | } |
| 187 | |
| 188 | if len(addrs) < 1 { |
| 189 | return nil, ConfigurationError("You must provide at least one broker address") |
| 190 | } |
| 191 | |
| 192 | if strings.Contains(addrs[0], ".servicebus.windows.net") { |
| 193 | if conf.Version.IsAtLeast(V1_1_0_0) || !conf.Version.IsAtLeast(V0_11_0_0) { |
| 194 | Logger.Println("Connecting to Azure Event Hubs, forcing version to V1_0_0_0 for compatibility") |
| 195 | conf.Version = V1_0_0_0 |
| 196 | } |
| 197 | } |
| 198 | client := &client{ |
| 199 | conf: conf, |
| 200 | closer: make(chan none), |
| 201 | closed: make(chan none), |
| 202 | brokers: make(map[int32]*Broker), |
| 203 | metadata: make(map[string]map[int32]*PartitionMetadata), |
| 204 | metadataTopics: make(map[string]none), |
| 205 | cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32), |
| 206 | coordinators: make(map[string]int32), |
| 207 | transactionCoordinators: make(map[string]int32), |
| 208 | } |
| 209 | refresh := func(topics []string) error { |
| 210 | deadline := time.Time{} |
| 211 | if client.conf.Metadata.Timeout > 0 { |
| 212 | deadline = time.Now().Add(client.conf.Metadata.Timeout) |
| 213 | } |
| 214 | return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline) |
| 215 | } |
| 216 | if conf.Metadata.SingleFlight { |
| 217 | client.metadataRefresh = newSingleFlightRefresher(refresh) |
| 218 | } else { |
| 219 | client.metadataRefresh = refresh |
| 220 | } |
| 221 | |
| 222 | if conf.Net.ResolveCanonicalBootstrapServers { |
| 223 | var err error |
| 224 | addrs, err = client.resolveCanonicalNames(addrs) |
| 225 | if err != nil { |
| 226 | return nil, err |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | client.randomizeSeedBrokers(addrs) |
| 231 | |
| 232 | if conf.Metadata.Full { |
| 233 | // do an initial fetch of all cluster metadata by specifying an empty list of topics |
| 234 | err := client.RefreshMetadata() |