MCPcopy
hub / github.com/IBM/sarama / runSyncProducer

Function runSyncProducer

tools/kafka-producer-performance/main.go:365–418  ·  view source on GitHub ↗
(topic string, partition, messageLoad, messageSize, routines int,
	config *sarama.Config, brokers []string, throughput int)

Source from the content-addressed store, hash-verified

363}
364
365func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
366 config *sarama.Config, brokers []string, throughput int) {
367 producer, err := sarama.NewSyncProducer(brokers, config)
368 if err != nil {
369 printErrorAndExit(69, "Failed to create producer: %s", err)
370 }
371 defer func() {
372 // Print final metrics.
373 printMetrics(os.Stdout, config.MetricRegistry)
374 if err := producer.Close(); err != nil {
375 printErrorAndExit(69, "Failed to close producer: %s", err)
376 }
377 }()
378
379 messages := make([][]*sarama.ProducerMessage, routines)
380 for i := range routines {
381 if i == routines-1 {
382 messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize)
383 } else {
384 messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize)
385 }
386 }
387
388 var wg gosync.WaitGroup
389 if throughput > 0 {
390 for _, messages := range messages {
391 wg.Go(func() {
392 ticker := time.NewTicker(time.Second)
393 for _, message := range messages {
394 for range throughput {
395 _, _, err = producer.SendMessage(message)
396 if err != nil {
397 printErrorAndExit(69, "Failed to send message: %s", err)
398 }
399 }
400 <-ticker.C
401 }
402 ticker.Stop()
403 })
404 }
405 } else {
406 for _, messages := range messages {
407 wg.Go(func() {
408 for _, message := range messages {
409 _, _, err = producer.SendMessage(message)
410 if err != nil {
411 printErrorAndExit(69, "Failed to send message: %s", err)
412 }
413 }
414 })
415 }
416 }
417 wg.Wait()
418}
419
420func printMetrics(w io.Writer, r metrics.Registry) {
421 recordSendRateMetric := r.Get("record-send-rate")

Callers 1

mainFunction · 0.85

Calls 7

CloseMethod · 0.95
SendMessageMethod · 0.95
NewSyncProducerFunction · 0.92
printMetricsFunction · 0.85
generateMessagesFunction · 0.85
StopMethod · 0.80
printErrorAndExitFunction · 0.70

Tested by

no test coverage detected