(brokers []string, producerConfigurationProvider func() *sarama.Config)
| 318 | } |
| 319 | |
| 320 | func newProducerProvider(brokers []string, producerConfigurationProvider func() *sarama.Config) *producerProvider { |
| 321 | provider := &producerProvider{ |
| 322 | producers: make(map[topicPartition][]sarama.AsyncProducer), |
| 323 | } |
| 324 | provider.producerProvider = func(topic string, partition int32) sarama.AsyncProducer { |
| 325 | config := producerConfigurationProvider() |
| 326 | if config.Producer.Transaction.ID != "" { |
| 327 | config.Producer.Transaction.ID = config.Producer.Transaction.ID + "-" + topic + "-" + fmt.Sprint(partition) |
| 328 | } |
| 329 | producer, err := sarama.NewAsyncProducer(brokers, config) |
| 330 | if err != nil { |
| 331 | return nil |
| 332 | } |
| 333 | return producer |
| 334 | } |
| 335 | return provider |
| 336 | } |
| 337 | |
| 338 | func (p *producerProvider) borrow(topic string, partition int32) (producer sarama.AsyncProducer) { |
| 339 | p.producersLock.Lock() |
no test coverage detected