MCPcopy
hub / github.com/IBM/sarama / runAsyncProducer

Function runAsyncProducer

tools/kafka-producer-performance/main.go:318–363  ·  view source on GitHub ↗
(topic string, partition, messageLoad, messageSize int,
	config *sarama.Config, brokers []string, throughput int)

Source from the content-addressed store, hash-verified

316}
317
318func runAsyncProducer(topic string, partition, messageLoad, messageSize int,
319 config *sarama.Config, brokers []string, throughput int) {
320 producer, err := sarama.NewAsyncProducer(brokers, config)
321 if err != nil {
322 printErrorAndExit(69, "Failed to create producer: %s", err)
323 }
324 defer func() {
325 // Print final metrics.
326 printMetrics(os.Stdout, config.MetricRegistry)
327 if err := producer.Close(); err != nil {
328 printErrorAndExit(69, "Failed to close producer: %s", err)
329 }
330 }()
331
332 messages := generateMessages(topic, partition, messageLoad, messageSize)
333
334 messagesDone := make(chan struct{})
335 go func() {
336 for range messageLoad {
337 select {
338 case <-producer.Successes():
339 case err = <-producer.Errors():
340 printErrorAndExit(69, "%s", err)
341 }
342 }
343 messagesDone <- struct{}{}
344 }()
345
346 if throughput > 0 {
347 ticker := time.NewTicker(time.Second)
348 for idx, message := range messages {
349 producer.Input() <- message
350 if (idx+1)%throughput == 0 {
351 <-ticker.C
352 }
353 }
354 ticker.Stop()
355 } else {
356 for _, message := range messages {
357 producer.Input() <- message
358 }
359 }
360
361 <-messagesDone
362 close(messagesDone)
363}
364
365func runSyncProducer(topic string, partition, messageLoad, messageSize, routines int,
366 config *sarama.Config, brokers []string, throughput int) {

Callers 1

mainFunction · 0.85

Calls 9

CloseMethod · 0.95
SuccessesMethod · 0.95
ErrorsMethod · 0.95
InputMethod · 0.95
NewAsyncProducerFunction · 0.92
printMetricsFunction · 0.85
generateMessagesFunction · 0.85
StopMethod · 0.80
printErrorAndExitFunction · 0.70

Tested by

no test coverage detected