| 711 | } |
| 712 | |
| 713 | func (client *client) updateBroker(brokers []*Broker) { |
| 714 | if client.brokers == nil { |
| 715 | return |
| 716 | } |
| 717 | |
| 718 | currentBroker := make(map[int32]*Broker, len(brokers)) |
| 719 | |
| 720 | for _, broker := range brokers { |
| 721 | if broker == nil { |
| 722 | continue |
| 723 | } |
| 724 | currentBroker[broker.ID()] = broker |
| 725 | if client.brokers[broker.ID()] == nil { // add new broker |
| 726 | client.brokers[broker.ID()] = broker |
| 727 | DebugLogger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr()) |
| 728 | } else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address |
| 729 | safeAsyncClose(client.brokers[broker.ID()]) |
| 730 | client.brokers[broker.ID()] = broker |
| 731 | Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr()) |
| 732 | } |
| 733 | } |
| 734 | |
| 735 | for id, broker := range client.brokers { |
| 736 | if _, exist := currentBroker[id]; !exist { // remove old broker |
| 737 | safeAsyncClose(broker) |
| 738 | delete(client.brokers, id) |
| 739 | Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr()) |
| 740 | } |
| 741 | } |
| 742 | } |
| 743 | |
| 744 | // registerBroker makes sure a broker received by a Metadata or Coordinator request is registered |
| 745 | // in the brokers map. It returns the broker that is registered, which may be the provided broker, |