(broker *Broker, bp *brokerProducer)
| 1753 | } |
| 1754 | |
| 1755 | func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) { |
| 1756 | p.brokerLock.Lock() |
| 1757 | defer p.brokerLock.Unlock() |
| 1758 | |
| 1759 | p.brokerRefs[bp]-- |
| 1760 | if p.brokerRefs[bp] == 0 { |
| 1761 | close(bp.input) |
| 1762 | delete(p.brokerRefs, bp) |
| 1763 | |
| 1764 | if p.brokers[broker] == bp { |
| 1765 | delete(p.brokers, broker) |
| 1766 | } |
| 1767 | } |
| 1768 | } |
| 1769 | |
| 1770 | func (p *asyncProducer) abandonBrokerConnection(broker *Broker) { |
| 1771 | p.brokerLock.Lock() |
no outgoing calls