MCPcopy
hub / github.com/IBM/sarama / NewAsyncProducer

Function NewAsyncProducer

mocks/async_producer.go:35–119  ·  view source on GitHub ↗

NewAsyncProducer instantiates a new Producer mock. The t argument should be the *testing.T instance of your test method. An error will be written to it if an expectation is violated. The config argument is validated and used to determine whether it should ack successes on the Successes channel and h

(t ErrorReporter, config *sarama.Config)

Source from the content-addressed store, hash-verified

33// an expectation is violated. The config argument is validated and used to determine
34// whether it should ack successes on the Successes channel and handle partitioning.
35func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
36 if config == nil {
37 config = sarama.NewConfig()
38 }
39 if err := config.Validate(); err != nil {
40 t.Errorf("Invalid mock configuration provided: %s", err.Error())
41 }
42 mp := &AsyncProducer{
43 t: t,
44 closed: make(chan struct{}),
45 expectations: make([]*producerExpectation, 0),
46 input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
47 successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize),
48 errors: make(chan *sarama.ProducerError, config.ChannelBufferSize),
49 isTransactional: config.Producer.Transaction.ID != "",
50 txnStatus: sarama.ProducerTxnFlagReady,
51 TopicConfig: NewTopicConfig(),
52 }
53
54 go func() {
55 defer func() {
56 close(mp.successes)
57 close(mp.errors)
58 close(mp.closed)
59 }()
60
61 partitioners := make(map[string]sarama.Partitioner, 1)
62
63 for msg := range mp.input {
64 mp.txnLock.Lock()
65 if mp.IsTransactional() && mp.txnStatus&sarama.ProducerTxnFlagInTransaction == 0 {
66 mp.t.Errorf("attempt to send message when transaction is not started or is in ending state.")
67 mp.errors <- &sarama.ProducerError{Err: errors.New("attempt to send message when transaction is not started or is in ending state"), Msg: msg}
68 continue
69 }
70 mp.txnLock.Unlock()
71 partitioner := partitioners[msg.Topic]
72 if partitioner == nil {
73 partitioner = config.Producer.Partitioner(msg.Topic)
74 partitioners[msg.Topic] = partitioner
75 }
76 mp.l.Lock()
77 if len(mp.expectations) == 0 {
78 mp.expectations = nil
79 mp.t.Errorf("No more expectation set on this mock producer to handle the input message.")
80 } else {
81 expectation := mp.expectations[0]
82 mp.expectations = mp.expectations[1:]
83
84 partition, err := partitioner.Partition(msg, mp.partitions(msg.Topic))
85 if err != nil {
86 mp.t.Errorf("Partitioner returned an error: %s", err.Error())
87 mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
88 } else {
89 msg.Partition = partition
90 if expectation.CheckFunction != nil {
91 err := expectation.CheckFunction(msg)
92 if err != nil {

Calls 9

IsTransactionalMethod · 0.95
NewConfigFunction · 0.92
NewTopicConfigFunction · 0.85
ValidateMethod · 0.80
IsMethod · 0.80
ErrorfMethod · 0.65
ErrorMethod · 0.65
PartitionMethod · 0.65
partitionsMethod · 0.65