MCPcopy
hub / github.com/IBM/sarama / Open

Method Open

broker.go:189–337  ·  view source on GitHub ↗

Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open

(conf *Config)

Source from the content-addressed store, hash-verified

187// follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
188// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
189func (b *Broker) Open(conf *Config) error {
190 if !b.opened.CompareAndSwap(false, true) {
191 return ErrAlreadyConnected
192 }
193
194 if conf == nil {
195 conf = NewConfig()
196 }
197
198 err := conf.Validate()
199 if err != nil {
200 return err
201 }
202
203 b.lock.Lock()
204
205 if b.metricRegistry == nil {
206 b.metricRegistry = newCleanupRegistry(conf.MetricRegistry)
207 }
208
209 go withRecover(func() {
210 defer b.lock.Unlock()
211
212 dialer := conf.getDialer()
213 b.conn, b.connErr = dialer.Dial("tcp", b.addr)
214 if b.connErr != nil {
215 Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
216 b.conn = nil
217 b.opened.Store(false)
218 return
219 }
220 if conf.Net.TLS.Enable {
221 b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config))
222 }
223
224 b.conn = newBufConn(b.conn)
225 b.conf = conf
226
227 // Create or reuse the global metrics shared between brokers
228 b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", b.metricRegistry)
229 b.requestRate = metrics.GetOrRegisterMeter("request-rate", b.metricRegistry)
230 b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", b.metricRegistry)
231 b.requestSize = getOrRegisterHistogram("request-size", b.metricRegistry)
232 b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", b.metricRegistry)
233 b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", b.metricRegistry)
234 b.responseRate = metrics.GetOrRegisterMeter("response-rate", b.metricRegistry)
235 b.responseSize = getOrRegisterHistogram("response-size", b.metricRegistry)
236 b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", b.metricRegistry)
237 b.protocolRequestsRate = map[int16]metrics.Meter{}
238 // Do not gather metrics for seeded broker (only used during bootstrap) because they share
239 // the same id (-1) and are already exposed through the global metrics above
240 if b.id >= 0 && !metrics.UseNilMetrics {
241 b.registerMetrics()
242 }
243
244 // Send an ApiVersionsRequest to identify the client (KIP-511).
245 // Store the response in the brokerAPIVersions map.
246 // It will be used to determine the supported API versions for each request.

Callers 15

ExampleBrokerFunction · 0.95
TestBrokerFailedRequestFunction · 0.95
TestBrokerCloseFunction · 0.95
TestBrokerFetchFunction · 0.95
TestSASLOAuthBearerFunction · 0.95
TestSASLSCRAMSHAXXXFunction · 0.95
TestSASLPlainAuthFunction · 0.95
TestSASLReadTimeoutFunction · 0.95

Calls 15

registerMetricsMethod · 0.95
maybeCloseLockedMethod · 0.95
authenticateViaSASLv0Method · 0.95
authenticateViaSASLv1Method · 0.95
closeLockedMethod · 0.95
newCleanupRegistryFunction · 0.85
withRecoverFunction · 0.85
validServerNameTLSFunction · 0.85
newBufConnFunction · 0.85
getOrRegisterHistogramFunction · 0.85
ValidateMethod · 0.80

Tested by 15

ExampleBrokerFunction · 0.76
TestBrokerFailedRequestFunction · 0.76
TestBrokerCloseFunction · 0.76
TestBrokerFetchFunction · 0.76
TestSASLOAuthBearerFunction · 0.76
TestSASLSCRAMSHAXXXFunction · 0.76
TestSASLPlainAuthFunction · 0.76
TestSASLReadTimeoutFunction · 0.76