MCPcopy
hub / github.com/IBM/sarama / sendAndReceiveSASLSCRAMv0

Method sendAndReceiveSASLSCRAMv0

broker.go:1692–1751  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1690}
1691
1692func (b *Broker) sendAndReceiveSASLSCRAMv0() error {
1693 if err := b.sendAndReceiveSASLHandshake(b.conf.Net.SASL.Mechanism, SASLHandshakeV0); err != nil {
1694 return err
1695 }
1696
1697 scramClient := b.conf.Net.SASL.SCRAMClientGeneratorFunc()
1698 if err := scramClient.Begin(b.conf.Net.SASL.User, b.conf.Net.SASL.Password, b.conf.Net.SASL.SCRAMAuthzID); err != nil {
1699 return fmt.Errorf("failed to start SCRAM exchange with the server: %w", err)
1700 }
1701
1702 msg, err := scramClient.Step("")
1703 if err != nil {
1704 return fmt.Errorf("failed to advance the SCRAM exchange: %w", err)
1705 }
1706
1707 for !scramClient.Done() {
1708 requestTime := time.Now()
1709 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1710 b.addRequestInFlightMetrics(1)
1711 length := len(msg)
1712 authBytes := make([]byte, length+4) // 4 byte length header + auth data
1713 binary.BigEndian.PutUint32(authBytes, uint32(length))
1714 copy(authBytes[4:], msg)
1715 _, err := b.write(authBytes)
1716 b.updateOutgoingCommunicationMetrics(length + 4)
1717 if err != nil {
1718 b.addRequestInFlightMetrics(-1)
1719 Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
1720 return err
1721 }
1722 b.correlationID++
1723 header := make([]byte, 4)
1724 _, err = b.readFull(header)
1725 if err != nil {
1726 b.addRequestInFlightMetrics(-1)
1727 Logger.Printf("Failed to read response header while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1728 return err
1729 }
1730 payloadLength := binary.BigEndian.Uint32(header)
1731 if int64(payloadLength) > int64(MaxResponseSize) {
1732 return PacketDecodingError{fmt.Sprintf("SASL response of length %d too large", payloadLength)}
1733 }
1734 payload := make([]byte, int(payloadLength))
1735 n, err := b.readFull(payload)
1736 if err != nil {
1737 b.addRequestInFlightMetrics(-1)
1738 Logger.Printf("Failed to read response payload while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
1739 return err
1740 }
1741 b.updateIncomingCommunicationMetrics(n+4, time.Since(requestTime))
1742 msg, err = scramClient.Step(string(payload))
1743 if err != nil {
1744 Logger.Println("SASL authentication failed", err)
1745 return err
1746 }
1747 }
1748
1749 DebugLogger.Println("SASL authentication succeeded")

Callers 1

authenticateViaSASLv0Method · 0.95

Calls 13

writeMethod · 0.95
readFullMethod · 0.95
BeginMethod · 0.65
ErrorfMethod · 0.65
StepMethod · 0.65
DoneMethod · 0.65
PrintfMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected