connect opens a socket connection to the broker, wraps it to create a kafka connection, and performs SASL authentication if configured to do so.
(ctx context.Context, network, address string, connCfg ConnConfig)
| 263 | // connect opens a socket connection to the broker, wraps it to create a |
| 264 | // kafka connection, and performs SASL authentication if configured to do so. |
| 265 | func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) { |
| 266 | if d.Timeout != 0 { |
| 267 | var cancel context.CancelFunc |
| 268 | ctx, cancel = context.WithTimeout(ctx, d.Timeout) |
| 269 | defer cancel() |
| 270 | } |
| 271 | |
| 272 | if !d.Deadline.IsZero() { |
| 273 | var cancel context.CancelFunc |
| 274 | ctx, cancel = context.WithDeadline(ctx, d.Deadline) |
| 275 | defer cancel() |
| 276 | } |
| 277 | |
| 278 | c, err := d.dialContext(ctx, network, address) |
| 279 | if err != nil { |
| 280 | return nil, fmt.Errorf("failed to dial: %w", err) |
| 281 | } |
| 282 | |
| 283 | conn := NewConnWith(c, connCfg) |
| 284 | |
| 285 | if d.SASLMechanism != nil { |
| 286 | host, port, err := splitHostPortNumber(address) |
| 287 | if err != nil { |
| 288 | return nil, fmt.Errorf("could not determine host/port for SASL authentication: %w", err) |
| 289 | } |
| 290 | metadata := &sasl.Metadata{ |
| 291 | Host: host, |
| 292 | Port: port, |
| 293 | } |
| 294 | if err := d.authenticateSASL(sasl.WithMetadata(ctx, metadata), conn); err != nil { |
| 295 | _ = conn.Close() |
| 296 | return nil, fmt.Errorf("could not successfully authenticate to %s:%d with SASL: %w", host, port, err) |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | return conn, nil |
| 301 | } |
| 302 | |
| 303 | // authenticateSASL performs all of the required requests to authenticate this |
| 304 | // connection. If any step fails, this function returns with an error. A nil |
no test coverage detected