MCPcopy
hub / github.com/segmentio/kafka-go / connect

Method connect

transport.go:1140–1239  ·  view source on GitHub ↗
(ctx context.Context, addr net.Addr)

Source from the content-addressed store, hash-verified

1138}
1139
1140func (g *connGroup) connect(ctx context.Context, addr net.Addr) (*conn, error) {
1141 deadline := time.Now().Add(g.pool.dialTimeout)
1142
1143 ctx, cancel := context.WithDeadline(ctx, deadline)
1144 defer cancel()
1145
1146 network := strings.Split(addr.Network(), ",")
1147 address := strings.Split(addr.String(), ",")
1148 var netConn net.Conn
1149 var netAddr net.Addr
1150 var err error
1151
1152 if len(address) > 1 {
1153 // Shuffle the list of addresses to randomize the order in which
1154 // connections are attempted. This prevents routing all connections
1155 // to the first broker (which will usually succeed).
1156 rand.Shuffle(len(address), func(i, j int) {
1157 network[i], network[j] = network[j], network[i]
1158 address[i], address[j] = address[j], address[i]
1159 })
1160 }
1161
1162 for i := range address {
1163 netConn, err = g.pool.dial(ctx, network[i], address[i])
1164 if err == nil {
1165 netAddr = &networkAddress{
1166 network: network[i],
1167 address: address[i],
1168 }
1169 break
1170 }
1171 }
1172
1173 if err != nil {
1174 return nil, err
1175 }
1176
1177 defer func() {
1178 if netConn != nil {
1179 netConn.Close()
1180 }
1181 }()
1182
1183 if tlsConfig := g.pool.tls; tlsConfig != nil {
1184 if tlsConfig.ServerName == "" {
1185 host, _ := splitHostPort(netAddr.String())
1186 tlsConfig = tlsConfig.Clone()
1187 tlsConfig.ServerName = host
1188 }
1189 netConn = tls.Client(netConn, tlsConfig)
1190 }
1191
1192 pc := protocol.NewConn(netConn, g.pool.clientID)
1193 pc.SetDeadline(deadline)
1194
1195 r, err := pc.RoundTrip(new(apiversions.Request))
1196 if err != nil {
1197 return nil, err

Callers 2

TestIssue477Function · 0.95
grabConnOrConnectMethod · 0.95

Calls 15

CloseMethod · 0.95
SetDeadlineMethod · 0.95
RoundTripMethod · 0.95
SetVersionsMethod · 0.95
runMethod · 0.95
NewConnFunction · 0.92
ApiKeyTypeAlias · 0.92
WithMetadataFunction · 0.92
splitHostPortFunction · 0.85
splitHostPortNumberFunction · 0.85
authenticateSASLFunction · 0.85
dialMethod · 0.80

Tested by 1

TestIssue477Function · 0.76