(topic string)
| 681 | } |
| 682 | |
| 683 | func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage { |
| 684 | input := make(chan *ProducerMessage, p.conf.ChannelBufferSize) |
| 685 | tp := &topicProducer{ |
| 686 | parent: p, |
| 687 | topic: topic, |
| 688 | input: input, |
| 689 | breaker: breaker.New(3, 1, 10*time.Second), |
| 690 | handlers: make(map[int32]chan<- *ProducerMessage), |
| 691 | partitioner: p.conf.Producer.Partitioner(topic), |
| 692 | } |
| 693 | go withRecover(tp.dispatch) |
| 694 | return input |
| 695 | } |
| 696 | |
| 697 | func (tp *topicProducer) dispatch() { |
| 698 | for msg := range tp.input { |
no test coverage detected