(b *testing.B, conf *Config, topic string, value Encoder)
| 1287 | } |
| 1288 | |
| 1289 | func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) { |
| 1290 | setupFunctionalTest(b) |
| 1291 | defer teardownFunctionalTest(b) |
| 1292 | |
| 1293 | metricsDisable := os.Getenv("METRICS_DISABLE") |
| 1294 | if metricsDisable != "" { |
| 1295 | previousUseNilMetrics := metrics.UseNilMetrics |
| 1296 | Logger.Println("Disabling metrics using no-op implementation") |
| 1297 | metrics.UseNilMetrics = true |
| 1298 | // Restore previous setting |
| 1299 | defer func() { |
| 1300 | metrics.UseNilMetrics = previousUseNilMetrics |
| 1301 | }() |
| 1302 | } |
| 1303 | |
| 1304 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf) |
| 1305 | if err != nil { |
| 1306 | b.Fatal(err) |
| 1307 | } |
| 1308 | |
| 1309 | b.ResetTimer() |
| 1310 | |
| 1311 | for i := 1; i <= b.N; { |
| 1312 | msg := &ProducerMessage{Topic: topic, Key: StringEncoder(fmt.Sprintf("%d", i)), Value: value} |
| 1313 | select { |
| 1314 | case producer.Input() <- msg: |
| 1315 | i++ |
| 1316 | case ret := <-producer.Errors(): |
| 1317 | b.Fatal(ret.Err) |
| 1318 | } |
| 1319 | } |
| 1320 | safeClose(b, producer) |
| 1321 | } |
no test coverage detected