MCPcopy
hub / github.com/IBM/sarama / main

Function main

tools/kafka-producer-performance/main.go:219–316  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

217}
218
219func main() {
220 flag.Parse()
221
222 if *brokers == "" {
223 printUsageErrorAndExit("-brokers is required")
224 }
225 if *topic == "" {
226 printUsageErrorAndExit("-topic is required")
227 }
228 if *messageLoad <= 0 {
229 printUsageErrorAndExit("-message-load must be greater than 0")
230 }
231 if *messageSize <= 0 {
232 printUsageErrorAndExit("-message-size must be greater than 0")
233 }
234 if *routines < 1 || *routines > *messageLoad {
235 printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
236 }
237 if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" {
238 printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol))
239 }
240 if *verbose {
241 sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
242 }
243
244 config := sarama.NewConfig()
245
246 config.Net.MaxOpenRequests = *maxOpenRequests
247 config.Producer.MaxMessageBytes = *maxMessageBytes
248 config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
249 config.Producer.Timeout = *timeout
250 config.Producer.Partitioner = parsePartitioner(*partitioner, *partition)
251 config.Producer.Compression = parseCompression(*compression)
252 config.Producer.Flush.Frequency = *flushFrequency
253 config.Producer.Flush.Bytes = *flushBytes
254 config.Producer.Flush.Messages = *flushMessages
255 config.Producer.Flush.MaxMessages = *flushMaxMessages
256 config.Producer.Return.Successes = true
257 config.ClientID = *clientID
258 config.ChannelBufferSize = *channelBufferSize
259 config.Version = parseVersion(*version)
260
261 if *securityProtocol == "SSL" {
262 tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
263 if err != nil {
264 printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v",
265 *tlsClientCert, *tlsClientKey, err)
266 }
267
268 if *tlsRootCACerts != "" {
269 rootCAsBytes, err := os.ReadFile(*tlsRootCACerts)
270 if err != nil {
271 printErrorAndExit(69, "failed to read root CA certificates: %v", err)
272 }
273 certPool := x509.NewCertPool()
274 if !certPool.AppendCertsFromPEM(rootCAsBytes) {
275 printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts)
276 }

Callers

nothing calls this directly

Calls 13

ValidateMethod · 0.95
NewConfigFunction · 0.92
RequiredAcksTypeAlias · 0.92
NewConfigFunction · 0.92
parsePartitionerFunction · 0.85
parseCompressionFunction · 0.85
parseVersionFunction · 0.85
printMetricsFunction · 0.85
runSyncProducerFunction · 0.85
runAsyncProducerFunction · 0.85
printUsageErrorAndExitFunction · 0.70
printErrorAndExitFunction · 0.70

Tested by

no test coverage detected