(brokerList []string, version sarama.KafkaVersion)
| 221 | } |
| 222 | |
| 223 | func newAccessLogProducer(brokerList []string, version sarama.KafkaVersion) sarama.AsyncProducer { |
| 224 | // For the access log, we are looking for AP semantics, with high throughput. |
| 225 | // By creating batches of compressed messages, we reduce network I/O at a cost of more latency. |
| 226 | config := sarama.NewConfig() |
| 227 | config.Version = version |
| 228 | tlsConfig := createTlsConfiguration() |
| 229 | if tlsConfig != nil { |
| 230 | config.Net.TLS.Enable = true |
| 231 | config.Net.TLS.Config = tlsConfig |
| 232 | } |
| 233 | config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack |
| 234 | config.Producer.Compression = sarama.CompressionSnappy // Compress messages |
| 235 | config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms |
| 236 | |
| 237 | producer, err := sarama.NewAsyncProducer(brokerList, config) |
| 238 | if err != nil { |
| 239 | log.Fatalln("Failed to start Sarama producer:", err) |
| 240 | } |
| 241 | |
| 242 | // We will just log to STDOUT if we're not able to produce messages. |
| 243 | // Note: messages will only be returned here after all retry attempts are exhausted. |
| 244 | go func() { |
| 245 | for err := range producer.Errors() { |
| 246 | log.Println("Failed to write access log entry:", err) |
| 247 | } |
| 248 | }() |
| 249 | |
| 250 | return producer |
| 251 | } |
no test coverage detected