MCPcopy
hub / github.com/segmentio/kafka-go / grabConnOrConnect

Method grabConnOrConnect

transport.go:982–1049  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

980}
981
982func (g *connGroup) grabConnOrConnect(ctx context.Context) (*conn, error) {
983 rslv := g.pool.resolver
984 addr := g.addr
985 var c *conn
986
987 if rslv == nil {
988 c = g.grabConn()
989 } else {
990 var err error
991 broker := g.broker
992
993 if broker.ID < 0 {
994 host, port, err := splitHostPortNumber(addr.String())
995 if err != nil {
996 return nil, err
997 }
998 broker.Host = host
999 broker.Port = port
1000 }
1001
1002 ipAddrs, err := rslv.LookupBrokerIPAddr(ctx, broker)
1003 if err != nil {
1004 return nil, err
1005 }
1006
1007 for _, ipAddr := range ipAddrs {
1008 network := addr.Network()
1009 address := net.JoinHostPort(ipAddr.String(), strconv.Itoa(broker.Port))
1010
1011 if c = g.grabConnTo(network, address); c != nil {
1012 break
1013 }
1014 }
1015 }
1016
1017 if c == nil {
1018 connChan := make(chan *conn)
1019 errChan := make(chan error)
1020
1021 go func() {
1022 c, err := g.connect(ctx, addr)
1023 if err != nil {
1024 select {
1025 case errChan <- err:
1026 case <-ctx.Done():
1027 }
1028 } else {
1029 select {
1030 case connChan <- c:
1031 case <-ctx.Done():
1032 if !g.releaseConn(c) {
1033 c.close()
1034 }
1035 }
1036 }
1037 }()
1038
1039 select {

Callers 2

grabBrokerConnMethod · 0.80
grabClusterConnMethod · 0.80

Calls 11

grabConnMethod · 0.95
grabConnToMethod · 0.95
connectMethod · 0.95
releaseConnMethod · 0.95
closeMethod · 0.95
splitHostPortNumberFunction · 0.85
DoneMethod · 0.80
LookupBrokerIPAddrMethod · 0.65
StringMethod · 0.45
NetworkMethod · 0.45
ErrMethod · 0.45

Tested by

no test coverage detected