(brokers []string, producerConfigurationProvider func() *sarama.Config)
| 172 | } |
| 173 | |
| 174 | func newProducerProvider(brokers []string, producerConfigurationProvider func() *sarama.Config) *producerProvider { |
| 175 | provider := &producerProvider{} |
| 176 | provider.producerProvider = func() sarama.AsyncProducer { |
| 177 | config := producerConfigurationProvider() |
| 178 | suffix := provider.transactionIdGenerator |
| 179 | // Append transactionIdGenerator to current config.Producer.Transaction.ID to ensure transaction-id uniqueness. |
| 180 | if config.Producer.Transaction.ID != "" { |
| 181 | provider.transactionIdGenerator++ |
| 182 | config.Producer.Transaction.ID = config.Producer.Transaction.ID + "-" + fmt.Sprint(suffix) |
| 183 | } |
| 184 | producer, err := sarama.NewAsyncProducer(brokers, config) |
| 185 | if err != nil { |
| 186 | return nil |
| 187 | } |
| 188 | return producer |
| 189 | } |
| 190 | return provider |
| 191 | } |
| 192 | |
| 193 | func (p *producerProvider) borrow() (producer sarama.AsyncProducer) { |
| 194 | p.producersLock.Lock() |
no test coverage detected