MCPcopy
hub / github.com/IBM/sarama / newTopicProducer

Method newTopicProducer

async_producer.go:683–695  ·  view source on GitHub ↗
(topic string)

Source from the content-addressed store, hash-verified

681}
682
683func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
684 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
685 tp := &topicProducer{
686 parent: p,
687 topic: topic,
688 input: input,
689 breaker: breaker.New(3, 1, 10*time.Second),
690 handlers: make(map[int32]chan<- *ProducerMessage),
691 partitioner: p.conf.Producer.Partitioner(topic),
692 }
693 go withRecover(tp.dispatch)
694 return input
695}
696
697func (tp *topicProducer) dispatch() {
698 for msg := range tp.input {

Callers 1

dispatcherMethod · 0.95

Calls 1

withRecoverFunction · 0.85

Tested by

no test coverage detected