()
| 217 | } |
| 218 | |
| 219 | func main() { |
| 220 | flag.Parse() |
| 221 | |
| 222 | if *brokers == "" { |
| 223 | printUsageErrorAndExit("-brokers is required") |
| 224 | } |
| 225 | if *topic == "" { |
| 226 | printUsageErrorAndExit("-topic is required") |
| 227 | } |
| 228 | if *messageLoad <= 0 { |
| 229 | printUsageErrorAndExit("-message-load must be greater than 0") |
| 230 | } |
| 231 | if *messageSize <= 0 { |
| 232 | printUsageErrorAndExit("-message-size must be greater than 0") |
| 233 | } |
| 234 | if *routines < 1 || *routines > *messageLoad { |
| 235 | printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load") |
| 236 | } |
| 237 | if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" { |
| 238 | printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol)) |
| 239 | } |
| 240 | if *verbose { |
| 241 | sarama.Logger = log.New(os.Stderr, "", log.LstdFlags) |
| 242 | } |
| 243 | |
| 244 | config := sarama.NewConfig() |
| 245 | |
| 246 | config.Net.MaxOpenRequests = *maxOpenRequests |
| 247 | config.Producer.MaxMessageBytes = *maxMessageBytes |
| 248 | config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks) |
| 249 | config.Producer.Timeout = *timeout |
| 250 | config.Producer.Partitioner = parsePartitioner(*partitioner, *partition) |
| 251 | config.Producer.Compression = parseCompression(*compression) |
| 252 | config.Producer.Flush.Frequency = *flushFrequency |
| 253 | config.Producer.Flush.Bytes = *flushBytes |
| 254 | config.Producer.Flush.Messages = *flushMessages |
| 255 | config.Producer.Flush.MaxMessages = *flushMaxMessages |
| 256 | config.Producer.Return.Successes = true |
| 257 | config.ClientID = *clientID |
| 258 | config.ChannelBufferSize = *channelBufferSize |
| 259 | config.Version = parseVersion(*version) |
| 260 | |
| 261 | if *securityProtocol == "SSL" { |
| 262 | tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey) |
| 263 | if err != nil { |
| 264 | printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v", |
| 265 | *tlsClientCert, *tlsClientKey, err) |
| 266 | } |
| 267 | |
| 268 | if *tlsRootCACerts != "" { |
| 269 | rootCAsBytes, err := os.ReadFile(*tlsRootCACerts) |
| 270 | if err != nil { |
| 271 | printErrorAndExit(69, "failed to read root CA certificates: %v", err) |
| 272 | } |
| 273 | certPool := x509.NewCertPool() |
| 274 | if !certPool.AppendCertsFromPEM(rootCAsBytes) { |
| 275 | printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts) |
| 276 | } |
nothing calls this directly
no test coverage detected