(brokerList []string, version sarama.KafkaVersion)
| 193 | } |
| 194 | |
| 195 | func newDataCollector(brokerList []string, version sarama.KafkaVersion) sarama.SyncProducer { |
| 196 | // For the data collector, we are looking for strong consistency semantics. |
| 197 | // Because we don't change the flush settings, sarama will try to produce messages |
| 198 | // as fast as possible to keep latency low. |
| 199 | config := sarama.NewConfig() |
| 200 | config.Version = version |
| 201 | config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message |
| 202 | config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message |
| 203 | config.Producer.Return.Successes = true |
| 204 | tlsConfig := createTlsConfiguration() |
| 205 | if tlsConfig != nil { |
| 206 | config.Net.TLS.Config = tlsConfig |
| 207 | config.Net.TLS.Enable = true |
| 208 | } |
| 209 | |
| 210 | // On the broker side, you may want to change the following settings to get |
| 211 | // stronger consistency guarantees: |
| 212 | // - For your broker, set `unclean.leader.election.enable` to false |
| 213 | // - For the topic, you could increase `min.insync.replicas`. |
| 214 | |
| 215 | producer, err := sarama.NewSyncProducer(brokerList, config) |
| 216 | if err != nil { |
| 217 | log.Fatalln("Failed to start Sarama producer:", err) |
| 218 | } |
| 219 | |
| 220 | return producer |
| 221 | } |
| 222 | |
| 223 | func newAccessLogProducer(brokerList []string, version sarama.KafkaVersion) sarama.AsyncProducer { |
| 224 | // For the access log, we are looking for AP semantics, with high throughput. |
no test coverage detected