(topic string, partition, messageLoad, messageSize, routines int, config *sarama.Config, brokers []string, throughput int)
| 363 | } |
| 364 | |
| 365 | func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int, |
| 366 | config *sarama.Config, brokers []string, throughput int) { |
| 367 | producer, err := sarama.NewSyncProducer(brokers, config) |
| 368 | if err != nil { |
| 369 | printErrorAndExit(69, "Failed to create producer: %s", err) |
| 370 | } |
| 371 | defer func() { |
| 372 | // Print final metrics. |
| 373 | printMetrics(os.Stdout, config.MetricRegistry) |
| 374 | if err := producer.Close(); err != nil { |
| 375 | printErrorAndExit(69, "Failed to close producer: %s", err) |
| 376 | } |
| 377 | }() |
| 378 | |
| 379 | messages := make([][]*sarama.ProducerMessage, routines) |
| 380 | for i := range routines { |
| 381 | if i == routines-1 { |
| 382 | messages[i] = generateMessages(topic, partition, messageLoad/routines+messageLoad%routines, messageSize) |
| 383 | } else { |
| 384 | messages[i] = generateMessages(topic, partition, messageLoad/routines, messageSize) |
| 385 | } |
| 386 | } |
| 387 | |
| 388 | var wg gosync.WaitGroup |
| 389 | if throughput > 0 { |
| 390 | for _, messages := range messages { |
| 391 | wg.Go(func() { |
| 392 | ticker := time.NewTicker(time.Second) |
| 393 | for _, message := range messages { |
| 394 | for range throughput { |
| 395 | _, _, err = producer.SendMessage(message) |
| 396 | if err != nil { |
| 397 | printErrorAndExit(69, "Failed to send message: %s", err) |
| 398 | } |
| 399 | } |
| 400 | <-ticker.C |
| 401 | } |
| 402 | ticker.Stop() |
| 403 | }) |
| 404 | } |
| 405 | } else { |
| 406 | for _, messages := range messages { |
| 407 | wg.Go(func() { |
| 408 | for _, message := range messages { |
| 409 | _, _, err = producer.SendMessage(message) |
| 410 | if err != nil { |
| 411 | printErrorAndExit(69, "Failed to send message: %s", err) |
| 412 | } |
| 413 | } |
| 414 | }) |
| 415 | } |
| 416 | } |
| 417 | wg.Wait() |
| 418 | } |
| 419 | |
| 420 | func printMetrics(w io.Writer, r metrics.Registry) { |
| 421 | recordSendRateMetric := r.Get("record-send-rate") |
no test coverage detected