MCPcopy
hub / github.com/IBM/sarama / newAccessLogProducer

Function newAccessLogProducer

examples/http_server/http_server.go:223–251  ·  view source on GitHub ↗
(brokerList []string, version sarama.KafkaVersion)

Source from the content-addressed store, hash-verified

221}
222
223func newAccessLogProducer(brokerList []string, version sarama.KafkaVersion) sarama.AsyncProducer {
224 // For the access log, we are looking for AP semantics, with high throughput.
225 // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
226 config := sarama.NewConfig()
227 config.Version = version
228 tlsConfig := createTlsConfiguration()
229 if tlsConfig != nil {
230 config.Net.TLS.Enable = true
231 config.Net.TLS.Config = tlsConfig
232 }
233 config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
234 config.Producer.Compression = sarama.CompressionSnappy // Compress messages
235 config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
236
237 producer, err := sarama.NewAsyncProducer(brokerList, config)
238 if err != nil {
239 log.Fatalln("Failed to start Sarama producer:", err)
240 }
241
242 // We will just log to STDOUT if we're not able to produce messages.
243 // Note: messages will only be returned here after all retry attempts are exhausted.
244 go func() {
245 for err := range producer.Errors() {
246 log.Println("Failed to write access log entry:", err)
247 }
248 }()
249
250 return producer
251}

Callers 1

mainFunction · 0.85

Calls 5

ErrorsMethod · 0.95
NewConfigFunction · 0.92
NewAsyncProducerFunction · 0.92
createTlsConfigurationFunction · 0.85
PrintlnMethod · 0.65

Tested by

no test coverage detected