MCPcopy
hub / github.com/IBM/sarama / NewClient

Function NewClient

client.go:177–250  ·  view source on GitHub ↗

NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

(addrs []string, conf *Config)

Source from the content-addressed store, hash-verified

175// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
176// be retrieved from any of the given broker addresses, the client is not created.
177func NewClient(addrs []string, conf *Config) (Client, error) {
178 DebugLogger.Println("Initializing new client")
179
180 if conf == nil {
181 conf = NewConfig()
182 }
183
184 if err := conf.Validate(); err != nil {
185 return nil, err
186 }
187
188 if len(addrs) < 1 {
189 return nil, ConfigurationError("You must provide at least one broker address")
190 }
191
192 if strings.Contains(addrs[0], ".servicebus.windows.net") {
193 if conf.Version.IsAtLeast(V1_1_0_0) || !conf.Version.IsAtLeast(V0_11_0_0) {
194 Logger.Println("Connecting to Azure Event Hubs, forcing version to V1_0_0_0 for compatibility")
195 conf.Version = V1_0_0_0
196 }
197 }
198 client := &client{
199 conf: conf,
200 closer: make(chan none),
201 closed: make(chan none),
202 brokers: make(map[int32]*Broker),
203 metadata: make(map[string]map[int32]*PartitionMetadata),
204 metadataTopics: make(map[string]none),
205 cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
206 coordinators: make(map[string]int32),
207 transactionCoordinators: make(map[string]int32),
208 }
209 refresh := func(topics []string) error {
210 deadline := time.Time{}
211 if client.conf.Metadata.Timeout > 0 {
212 deadline = time.Now().Add(client.conf.Metadata.Timeout)
213 }
214 return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
215 }
216 if conf.Metadata.SingleFlight {
217 client.metadataRefresh = newSingleFlightRefresher(refresh)
218 } else {
219 client.metadataRefresh = refresh
220 }
221
222 if conf.Net.ResolveCanonicalBootstrapServers {
223 var err error
224 addrs, err = client.resolveCanonicalNames(addrs)
225 if err != nil {
226 return nil, err
227 }
228 }
229
230 client.randomizeSeedBrokers(addrs)
231
232 if conf.Metadata.Full {
233 // do an initial fetch of all cluster metadata by specifying an empty list of topics
234 err := client.RefreshMetadata()

Callers 15

NewConsumerFunction · 0.70
TestInterceptorsFunction · 0.70
testProducingMessagesFunction · 0.70
prepareTestTopicsFunction · 0.70
ensureFullyReplicatedFunction · 0.70

Calls 14

tryRefreshMetadataMethod · 0.95
resolveCanonicalNamesMethod · 0.95
randomizeSeedBrokersMethod · 0.95
RefreshMetadataMethod · 0.95
CloseMethod · 0.95
ConfigurationErrorTypeAlias · 0.85
newSingleFlightRefresherFunction · 0.85
withRecoverFunction · 0.85
ValidateMethod · 0.80
IsAtLeastMethod · 0.80
IsMethod · 0.80
NewConfigFunction · 0.70