MCPcopy
hub / github.com/segmentio/kafka-go / grabPool

Method grabPool

transport.go:212–266  ·  view source on GitHub ↗
(addr net.Addr)

Source from the content-addressed store, hash-verified

210}
211
212func (t *Transport) grabPool(addr net.Addr) *connPool {
213 k := networkAddress{
214 network: addr.Network(),
215 address: addr.String(),
216 }
217
218 t.mutex.RLock()
219 p := t.pools[k]
220 if p != nil {
221 p.ref()
222 }
223 t.mutex.RUnlock()
224
225 if p != nil {
226 return p
227 }
228
229 t.mutex.Lock()
230 defer t.mutex.Unlock()
231
232 if p := t.pools[k]; p != nil {
233 p.ref()
234 return p
235 }
236
237 ctx, cancel := context.WithCancel(t.context())
238
239 p = &connPool{
240 refc: 2,
241
242 dial: t.dial(),
243 dialTimeout: t.dialTimeout(),
244 idleTimeout: t.idleTimeout(),
245 metadataTTL: t.metadataTTL(),
246 metadataTopics: t.MetadataTopics,
247 clientID: t.ClientID,
248 tls: t.TLS,
249 sasl: t.SASL,
250 resolver: t.Resolver,
251
252 ready: make(event),
253 wake: make(chan event),
254 conns: make(map[int32]*connGroup),
255 cancel: cancel,
256 }
257
258 p.ctrl = p.newConnGroup(addr)
259 go p.discover(ctx, p.wake)
260
261 if t.pools == nil {
262 t.pools = make(map[networkAddress]*connPool)
263 }
264 t.pools[k] = p
265 return p
266}
267
268func (t *Transport) context() context.Context {
269 if t.Context != nil {

Callers 1

RoundTripMethod · 0.95

Calls 10

contextMethod · 0.95
dialMethod · 0.95
dialTimeoutMethod · 0.95
idleTimeoutMethod · 0.95
metadataTTLMethod · 0.95
newConnGroupMethod · 0.80
discoverMethod · 0.80
NetworkMethod · 0.45
StringMethod · 0.45
refMethod · 0.45

Tested by

no test coverage detected