(addr net.Addr)
| 210 | } |
| 211 | |
| 212 | func (t *Transport) grabPool(addr net.Addr) *connPool { |
| 213 | k := networkAddress{ |
| 214 | network: addr.Network(), |
| 215 | address: addr.String(), |
| 216 | } |
| 217 | |
| 218 | t.mutex.RLock() |
| 219 | p := t.pools[k] |
| 220 | if p != nil { |
| 221 | p.ref() |
| 222 | } |
| 223 | t.mutex.RUnlock() |
| 224 | |
| 225 | if p != nil { |
| 226 | return p |
| 227 | } |
| 228 | |
| 229 | t.mutex.Lock() |
| 230 | defer t.mutex.Unlock() |
| 231 | |
| 232 | if p := t.pools[k]; p != nil { |
| 233 | p.ref() |
| 234 | return p |
| 235 | } |
| 236 | |
| 237 | ctx, cancel := context.WithCancel(t.context()) |
| 238 | |
| 239 | p = &connPool{ |
| 240 | refc: 2, |
| 241 | |
| 242 | dial: t.dial(), |
| 243 | dialTimeout: t.dialTimeout(), |
| 244 | idleTimeout: t.idleTimeout(), |
| 245 | metadataTTL: t.metadataTTL(), |
| 246 | metadataTopics: t.MetadataTopics, |
| 247 | clientID: t.ClientID, |
| 248 | tls: t.TLS, |
| 249 | sasl: t.SASL, |
| 250 | resolver: t.Resolver, |
| 251 | |
| 252 | ready: make(event), |
| 253 | wake: make(chan event), |
| 254 | conns: make(map[int32]*connGroup), |
| 255 | cancel: cancel, |
| 256 | } |
| 257 | |
| 258 | p.ctrl = p.newConnGroup(addr) |
| 259 | go p.discover(ctx, p.wake) |
| 260 | |
| 261 | if t.pools == nil { |
| 262 | t.pools = make(map[networkAddress]*connPool) |
| 263 | } |
| 264 | t.pools[k] = p |
| 265 | return p |
| 266 | } |
| 267 | |
| 268 | func (t *Transport) context() context.Context { |
| 269 | if t.Context != nil { |
no test coverage detected