MCPcopy
hub / github.com/segmentio/kafka-go / connect

Method connect

dialer.go:265–301  ·  view source on GitHub ↗

connect opens a socket connection to the broker, wraps it to create a kafka connection, and performs SASL authentication if configured to do so.

(ctx context.Context, network, address string, connCfg ConnConfig)

Source from the content-addressed store, hash-verified

263// connect opens a socket connection to the broker, wraps it to create a
264// kafka connection, and performs SASL authentication if configured to do so.
265func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {
266 if d.Timeout != 0 {
267 var cancel context.CancelFunc
268 ctx, cancel = context.WithTimeout(ctx, d.Timeout)
269 defer cancel()
270 }
271
272 if !d.Deadline.IsZero() {
273 var cancel context.CancelFunc
274 ctx, cancel = context.WithDeadline(ctx, d.Deadline)
275 defer cancel()
276 }
277
278 c, err := d.dialContext(ctx, network, address)
279 if err != nil {
280 return nil, fmt.Errorf("failed to dial: %w", err)
281 }
282
283 conn := NewConnWith(c, connCfg)
284
285 if d.SASLMechanism != nil {
286 host, port, err := splitHostPortNumber(address)
287 if err != nil {
288 return nil, fmt.Errorf("could not determine host/port for SASL authentication: %w", err)
289 }
290 metadata := &sasl.Metadata{
291 Host: host,
292 Port: port,
293 }
294 if err := d.authenticateSASL(sasl.WithMetadata(ctx, metadata), conn); err != nil {
295 _ = conn.Close()
296 return nil, fmt.Errorf("could not successfully authenticate to %s:%d with SASL: %w", host, port, err)
297 }
298 }
299
300 return conn, nil
301}
302
303// authenticateSASL performs all of the required requests to authenticate this
304// connection. If any step fails, this function returns with an error. A nil

Callers 3

DialContextMethod · 0.95
DialPartitionMethod · 0.95
coordinatorMethod · 0.45

Calls 7

dialContextMethod · 0.95
authenticateSASLMethod · 0.95
WithMetadataFunction · 0.92
NewConnWithFunction · 0.85
splitHostPortNumberFunction · 0.85
IsZeroMethod · 0.80
CloseMethod · 0.45

Tested by

no test coverage detected