()
| 1690 | } |
| 1691 | |
| 1692 | func (b *Broker) sendAndReceiveSASLSCRAMv0() error { |
| 1693 | if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV0); err != nil { |
| 1694 | return err |
| 1695 | } |
| 1696 | |
| 1697 | scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc() |
| 1698 | if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil { |
| 1699 | return fmt.Errorf("failed to start SCRAM exchange with the server: %w", err) |
| 1700 | } |
| 1701 | |
| 1702 | msg, err := scramClient.Step("") |
| 1703 | if err != nil { |
| 1704 | return fmt.Errorf("failed to advance the SCRAM exchange: %w", err) |
| 1705 | } |
| 1706 | |
| 1707 | for !scramClient.Done() { |
| 1708 | requestTime := time.Now() |
| 1709 | // Will be decremented in updateIncomingCommunicationMetrics (except error) |
| 1710 | b.addRequestInFlightMetrics(1) |
| 1711 | length := len(msg) |
| 1712 | authBytes := make([]byte, length+4) // 4 byte length header + auth data |
| 1713 | binary.BigEndian.PutUint32(authBytes, uint32(length)) |
| 1714 | copy(authBytes[4:], msg) |
| 1715 | _, err := b.write(authBytes) |
| 1716 | b.updateOutgoingCommunicationMetrics(length + 4) |
| 1717 | if err != nil { |
| 1718 | b.addRequestInFlightMetrics(-1) |
| 1719 | Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) |
| 1720 | return err |
| 1721 | } |
| 1722 | b.correlationID++ |
| 1723 | header := make([]byte, 4) |
| 1724 | _, err = b.readFull(header) |
| 1725 | if err != nil { |
| 1726 | b.addRequestInFlightMetrics(-1) |
| 1727 | Logger.Printf("Failed to read response header while authenticating with SASL to broker %s: %s\n", b.addr, err.Error()) |
| 1728 | return err |
| 1729 | } |
| 1730 | payloadLength := binary.BigEndian.Uint32(header) |
| 1731 | if int64(payloadLength) > int64(MaxResponseSize) { |
| 1732 | return PacketDecodingError{fmt.Sprintf("SASL response of length %d too large", payloadLength)} |
| 1733 | } |
| 1734 | payload := make([]byte, int(payloadLength)) |
| 1735 | n, err := b.readFull(payload) |
| 1736 | if err != nil { |
| 1737 | b.addRequestInFlightMetrics(-1) |
| 1738 | Logger.Printf("Failed to read response payload while authenticating with SASL to broker %s: %s\n", b.addr, err.Error()) |
| 1739 | return err |
| 1740 | } |
| 1741 | b.updateIncomingCommunicationMetrics(n+4, time.Since(requestTime)) |
| 1742 | msg, err = scramClient.Step(string(payload)) |
| 1743 | if err != nil { |
| 1744 | Logger.Println("SASL authentication failed", err) |
| 1745 | return err |
| 1746 | } |
| 1747 | } |
| 1748 | |
| 1749 | DebugLogger.Println("SASL authentication succeeded") |
no test coverage detected