MCPcopy
hub / github.com/IBM/sarama / sendAndReceiveSASLHandshake

Method sendAndReceiveSASLHandshake

broker.go:1535–1589  ·  view source on GitHub ↗
(saslType SASLMechanism, version int16)

Source from the content-addressed store, hash-verified

1533}
1534
1535func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
1536 rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
1537
1538 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
1539 buf, err := encode(req, b.metricRegistry)
1540 if err != nil {
1541 return err
1542 }
1543
1544 requestTime := time.Now()
1545 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1546 b.addRequestInFlightMetrics(1)
1547 bytes, err := b.write(buf)
1548 b.updateOutgoingCommunicationMetrics(bytes)
1549 if err != nil {
1550 b.addRequestInFlightMetrics(-1)
1551 Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
1552 return err
1553 }
1554 b.correlationID++
1555
1556 header := make([]byte, 8) // response header
1557 _, err = b.readFull(header)
1558 if err != nil {
1559 b.addRequestInFlightMetrics(-1)
1560 Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
1561 return err
1562 }
1563
1564 length := binary.BigEndian.Uint32(header[:4])
1565 payload := make([]byte, length-4)
1566 n, err := b.readFull(payload)
1567 if err != nil {
1568 b.addRequestInFlightMetrics(-1)
1569 Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
1570 return err
1571 }
1572
1573 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
1574 res := &SaslHandshakeResponse{}
1575
1576 err = versionedDecode(payload, res, 0, b.metricRegistry)
1577 if err != nil {
1578 Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
1579 return err
1580 }
1581
1582 if !errors.Is(res.Err, ErrNoError) {
1583 Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error())
1584 return res.Err
1585 }
1586
1587 DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms)
1588 return nil
1589}
1590
1591//
1592// In SASL Plain, Kafka expects the auth header to be in the following format

Callers 2

Calls 11

writeMethod · 0.95
readFullMethod · 0.95
encodeFunction · 0.85
versionedDecodeFunction · 0.85
IsMethod · 0.80
PrintfMethod · 0.65
ErrorMethod · 0.65
PrintMethod · 0.65

Tested by

no test coverage detected