(saslType SASLMechanism, version int16)
| 1533 | } |
| 1534 | |
| 1535 | func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error { |
| 1536 | rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version} |
| 1537 | |
| 1538 | req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} |
| 1539 | buf, err := encode(req, b.metricRegistry) |
| 1540 | if err != nil { |
| 1541 | return err |
| 1542 | } |
| 1543 | |
| 1544 | requestTime := time.Now() |
| 1545 | // Will be decremented in updateIncomingCommunicationMetrics (except error) |
| 1546 | b.addRequestInFlightMetrics(1) |
| 1547 | bytes, err := b.write(buf) |
| 1548 | b.updateOutgoingCommunicationMetrics(bytes) |
| 1549 | if err != nil { |
| 1550 | b.addRequestInFlightMetrics(-1) |
| 1551 | Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error()) |
| 1552 | return err |
| 1553 | } |
| 1554 | b.correlationID++ |
| 1555 | |
| 1556 | header := make([]byte, 8) // response header |
| 1557 | _, err = b.readFull(header) |
| 1558 | if err != nil { |
| 1559 | b.addRequestInFlightMetrics(-1) |
| 1560 | Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error()) |
| 1561 | return err |
| 1562 | } |
| 1563 | |
| 1564 | length := binary.BigEndian.Uint32(header[:4]) |
| 1565 | payload := make([]byte, length-4) |
| 1566 | n, err := b.readFull(payload) |
| 1567 | if err != nil { |
| 1568 | b.addRequestInFlightMetrics(-1) |
| 1569 | Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error()) |
| 1570 | return err |
| 1571 | } |
| 1572 | |
| 1573 | b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime)) |
| 1574 | res := &SaslHandshakeResponse{} |
| 1575 | |
| 1576 | err = versionedDecode(payload, res, 0, b.metricRegistry) |
| 1577 | if err != nil { |
| 1578 | Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error()) |
| 1579 | return err |
| 1580 | } |
| 1581 | |
| 1582 | if !errors.Is(res.Err, ErrNoError) { |
| 1583 | Logger.Printf("Invalid SASL Mechanism : %s\n", res.Err.Error()) |
| 1584 | return res.Err |
| 1585 | } |
| 1586 | |
| 1587 | DebugLogger.Print("Completed pre-auth SASL handshake. Available mechanisms: ", res.EnabledMechanisms) |
| 1588 | return nil |
| 1589 | } |
| 1590 | |
| 1591 | // |
| 1592 | // In SASL Plain, Kafka expects the auth header to be in the following format |
no test coverage detected