MCPcopy
hub / github.com/IBM/sarama / dispatch

Method dispatch

async_producer.go:697–718  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

695}
696
697func (tp *topicProducer) dispatch() {
698 for msg := range tp.input {
699 if msg.retries == 0 {
700 if err := tp.partitionMessage(msg); err != nil {
701 tp.parent.returnError(msg, err)
702 continue
703 }
704 }
705
706 handler := tp.handlers[msg.Partition]
707 if handler == nil {
708 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
709 tp.handlers[msg.Partition] = handler
710 }
711
712 handler <- msg
713 }
714
715 for _, handler := range tp.handlers {
716 close(handler)
717 }
718}
719
720func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
721 var partitions []int32

Callers

nothing calls this directly

Calls 3

partitionMessageMethod · 0.95
returnErrorMethod · 0.80
newPartitionProducerMethod · 0.80

Tested by

no test coverage detected