closeLocked closes the broker connection and resets state. NOTE: caller must hold b.lock.
()
| 394 | // closeLocked closes the broker connection and resets state. |
| 395 | // NOTE: caller must hold b.lock. |
| 396 | func (b *Broker) closeLocked() error { |
| 397 | if b.conn == nil { |
| 398 | return ErrNotConnected |
| 399 | } |
| 400 | |
| 401 | if b.responses != nil { |
| 402 | close(b.responses) |
| 403 | } |
| 404 | // close the socket before waiting so in-flight reads can exit |
| 405 | err := b.conn.Close() |
| 406 | if b.done != nil { |
| 407 | <-b.done |
| 408 | } |
| 409 | |
| 410 | b.conn = nil |
| 411 | b.responses = nil |
| 412 | b.done = nil |
| 413 | |
| 414 | b.metricRegistry.UnregisterAll() |
| 415 | |
| 416 | if err == nil { |
| 417 | DebugLogger.Printf("Closed connection to broker %s\n", b.addr) |
| 418 | } else { |
| 419 | Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err) |
| 420 | } |
| 421 | b.opened.Store(false) |
| 422 | |
| 423 | return err |
| 424 | } |
| 425 | |
| 426 | // ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known. |
| 427 | func (b *Broker) ID() int32 { |
no test coverage detected