NewAsyncProducer instantiates a new Producer mock. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument is validated and used to determine whether it should ack successes on the Successes channel and h
(t ErrorReporter, config *sarama.Config)
| 33 | // an expectation is violated. The config argument is validated and used to determine |
| 34 | // whether it should ack successes on the Successes channel and handle partitioning. |
| 35 | func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { |
| 36 | if config == nil { |
| 37 | config = sarama.NewConfig() |
| 38 | } |
| 39 | if err := config.Validate(); err != nil { |
| 40 | t.Errorf("Invalid mock configuration provided: %s", err.Error()) |
| 41 | } |
| 42 | mp := &AsyncProducer{ |
| 43 | t: t, |
| 44 | closed: make(chan struct{}), |
| 45 | expectations: make([]*producerExpectation, 0), |
| 46 | input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize), |
| 47 | successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize), |
| 48 | errors: make(chan *sarama.ProducerError, config.ChannelBufferSize), |
| 49 | isTransactional: config.Producer.Transaction.ID != "", |
| 50 | txnStatus: sarama.ProducerTxnFlagReady, |
| 51 | TopicConfig: NewTopicConfig(), |
| 52 | } |
| 53 | |
| 54 | go func() { |
| 55 | defer func() { |
| 56 | close(mp.successes) |
| 57 | close(mp.errors) |
| 58 | close(mp.closed) |
| 59 | }() |
| 60 | |
| 61 | partitioners := make(map[string]sarama.Partitioner, 1) |
| 62 | |
| 63 | for msg := range mp.input { |
| 64 | mp.txnLock.Lock() |
| 65 | if mp.IsTransactional() && mp.txnStatus&sarama.ProducerTxnFlagInTransaction == 0 { |
| 66 | mp.t.Errorf("attempt to send message when transaction is not started or is in ending state.") |
| 67 | mp.errors <- &sarama.ProducerError{Err: errors.New("attempt to send message when transaction is not started or is in ending state"), Msg: msg} |
| 68 | continue |
| 69 | } |
| 70 | mp.txnLock.Unlock() |
| 71 | partitioner := partitioners[msg.Topic] |
| 72 | if partitioner == nil { |
| 73 | partitioner = config.Producer.Partitioner(msg.Topic) |
| 74 | partitioners[msg.Topic] = partitioner |
| 75 | } |
| 76 | mp.l.Lock() |
| 77 | if len(mp.expectations) == 0 { |
| 78 | mp.expectations = nil |
| 79 | mp.t.Errorf("No more expectation set on this mock producer to handle the input message.") |
| 80 | } else { |
| 81 | expectation := mp.expectations[0] |
| 82 | mp.expectations = mp.expectations[1:] |
| 83 | |
| 84 | partition, err := partitioner.Partition(msg, mp.partitions(msg.Topic)) |
| 85 | if err != nil { |
| 86 | mp.t.Errorf("Partitioner returned an error: %s", err.Error()) |
| 87 | mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} |
| 88 | } else { |
| 89 | msg.Partition = partition |
| 90 | if expectation.CheckFunction != nil { |
| 91 | err := expectation.CheckFunction(msg) |
| 92 | if err != nil { |