(topic string, partition, messageLoad, messageSize int, config *sarama.Config, brokers []string, throughput int)
| 316 | } |
| 317 | |
| 318 | func runAsyncProducer(topic string, partition, messageLoad, messageSize int, |
| 319 | config *sarama.Config, brokers []string, throughput int) { |
| 320 | producer, err := sarama.NewAsyncProducer(brokers, config) |
| 321 | if err != nil { |
| 322 | printErrorAndExit(69, "Failed to create producer: %s", err) |
| 323 | } |
| 324 | defer func() { |
| 325 | // Print final metrics. |
| 326 | printMetrics(os.Stdout, config.MetricRegistry) |
| 327 | if err := producer.Close(); err != nil { |
| 328 | printErrorAndExit(69, "Failed to close producer: %s", err) |
| 329 | } |
| 330 | }() |
| 331 | |
| 332 | messages := generateMessages(topic, partition, messageLoad, messageSize) |
| 333 | |
| 334 | messagesDone := make(chan struct{}) |
| 335 | go func() { |
| 336 | for range messageLoad { |
| 337 | select { |
| 338 | case <-producer.Successes(): |
| 339 | case err = <-producer.Errors(): |
| 340 | printErrorAndExit(69, "%s", err) |
| 341 | } |
| 342 | } |
| 343 | messagesDone <- struct{}{} |
| 344 | }() |
| 345 | |
| 346 | if throughput > 0 { |
| 347 | ticker := time.NewTicker(time.Second) |
| 348 | for idx, message := range messages { |
| 349 | producer.Input() <- message |
| 350 | if (idx+1)%throughput == 0 { |
| 351 | <-ticker.C |
| 352 | } |
| 353 | } |
| 354 | ticker.Stop() |
| 355 | } else { |
| 356 | for _, message := range messages { |
| 357 | producer.Input() <- message |
| 358 | } |
| 359 | } |
| 360 | |
| 361 | <-messagesDone |
| 362 | close(messagesDone) |
| 363 | } |
| 364 | |
| 365 | func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int, |
| 366 | config *sarama.Config, brokers []string, throughput int) { |
no test coverage detected