Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open
(conf *Config)
| 187 | // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or |
| 188 | // AlreadyConnected. If conf is nil, the result of NewConfig() is used. |
| 189 | func (b *Broker) Open(conf *Config) error { |
| 190 | if !b.opened.CompareAndSwap(false, true) { |
| 191 | return ErrAlreadyConnected |
| 192 | } |
| 193 | |
| 194 | if conf == nil { |
| 195 | conf = NewConfig() |
| 196 | } |
| 197 | |
| 198 | err := conf.Validate() |
| 199 | if err != nil { |
| 200 | return err |
| 201 | } |
| 202 | |
| 203 | b.lock.Lock() |
| 204 | |
| 205 | if b.metricRegistry == nil { |
| 206 | b.metricRegistry = newCleanupRegistry(conf.MetricRegistry) |
| 207 | } |
| 208 | |
| 209 | go withRecover(func() { |
| 210 | defer b.lock.Unlock() |
| 211 | |
| 212 | dialer := conf.getDialer() |
| 213 | b.conn, b.connErr = dialer.Dial("tcp", b.addr) |
| 214 | if b.connErr != nil { |
| 215 | Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr) |
| 216 | b.conn = nil |
| 217 | b.opened.Store(false) |
| 218 | return |
| 219 | } |
| 220 | if conf.Net.TLS.Enable { |
| 221 | b.conn = tls.Client(b.conn, validServerNameTLS(b.addr, conf.Net.TLS.Config)) |
| 222 | } |
| 223 | |
| 224 | b.conn = newBufConn(b.conn) |
| 225 | b.conf = conf |
| 226 | |
| 227 | // Create or reuse the global metrics shared between brokers |
| 228 | b.incomingByteRate = metrics.GetOrRegisterMeter("incoming-byte-rate", b.metricRegistry) |
| 229 | b.requestRate = metrics.GetOrRegisterMeter("request-rate", b.metricRegistry) |
| 230 | b.fetchRate = metrics.GetOrRegisterMeter("consumer-fetch-rate", b.metricRegistry) |
| 231 | b.requestSize = getOrRegisterHistogram("request-size", b.metricRegistry) |
| 232 | b.requestLatency = getOrRegisterHistogram("request-latency-in-ms", b.metricRegistry) |
| 233 | b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", b.metricRegistry) |
| 234 | b.responseRate = metrics.GetOrRegisterMeter("response-rate", b.metricRegistry) |
| 235 | b.responseSize = getOrRegisterHistogram("response-size", b.metricRegistry) |
| 236 | b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", b.metricRegistry) |
| 237 | b.protocolRequestsRate = map[int16]metrics.Meter{} |
| 238 | // Do not gather metrics for seeded broker (only used during bootstrap) because they share |
| 239 | // the same id (-1) and are already exposed through the global metrics above |
| 240 | if b.id >= 0 && !metrics.UseNilMetrics { |
| 241 | b.registerMetrics() |
| 242 | } |
| 243 | |
| 244 | // Send an ApiVersionsRequest to identify the client (KIP-511). |
| 245 | // Store the response in the brokerAPIVersions map. |
| 246 | // It will be used to determine the supported API versions for each request. |