MCPcopy
hub / github.com/IBM/sarama / newProducerProvider

Function newProducerProvider

examples/exactly_once/main.go:320–336  ·  view source on GitHub ↗
(brokers []string, producerConfigurationProvider func() *sarama.Config)

Source from the content-addressed store, hash-verified

318}
319
320func newProducerProvider(brokers []string, producerConfigurationProvider func() *sarama.Config) *producerProvider {
321 provider := &producerProvider{
322 producers: make(map[topicPartition][]sarama.AsyncProducer),
323 }
324 provider.producerProvider = func(topic string, partition int32) sarama.AsyncProducer {
325 config := producerConfigurationProvider()
326 if config.Producer.Transaction.ID != "" {
327 config.Producer.Transaction.ID = config.Producer.Transaction.ID + "-" + topic + "-" + fmt.Sprint(partition)
328 }
329 producer, err := sarama.NewAsyncProducer(brokers, config)
330 if err != nil {
331 return nil
332 }
333 return producer
334 }
335 return provider
336}
337
338func (p *producerProvider) borrow(topic string, partition int32) (producer sarama.AsyncProducer) {
339 p.producersLock.Lock()

Callers 1

mainFunction · 0.70

Calls 1

NewAsyncProducerFunction · 0.92

Tested by

no test coverage detected