MCPcopy
hub / github.com/IBM/sarama / newDataCollector

Function newDataCollector

examples/http_server/http_server.go:195–221  ·  view source on GitHub ↗
(brokerList []string, version sarama.KafkaVersion)

Source from the content-addressed store, hash-verified

193}
194
195func newDataCollector(brokerList []string, version sarama.KafkaVersion) sarama.SyncProducer {
196 // For the data collector, we are looking for strong consistency semantics.
197 // Because we don't change the flush settings, sarama will try to produce messages
198 // as fast as possible to keep latency low.
199 config := sarama.NewConfig()
200 config.Version = version
201 config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
202 config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
203 config.Producer.Return.Successes = true
204 tlsConfig := createTlsConfiguration()
205 if tlsConfig != nil {
206 config.Net.TLS.Config = tlsConfig
207 config.Net.TLS.Enable = true
208 }
209
210 // On the broker side, you may want to change the following settings to get
211 // stronger consistency guarantees:
212 // - For your broker, set `unclean.leader.election.enable` to false
213 // - For the topic, you could increase `min.insync.replicas`.
214
215 producer, err := sarama.NewSyncProducer(brokerList, config)
216 if err != nil {
217 log.Fatalln("Failed to start Sarama producer:", err)
218 }
219
220 return producer
221}
222
223func newAccessLogProducer(brokerList []string, version sarama.KafkaVersion) sarama.AsyncProducer {
224 // For the access log, we are looking for AP semantics, with high throughput.

Callers 1

mainFunction · 0.85

Calls 3

NewConfigFunction · 0.92
NewSyncProducerFunction · 0.92
createTlsConfigurationFunction · 0.85

Tested by

no test coverage detected