MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerMultipleFlushes

Function TestAsyncProducerMultipleFlushes

async_producer_test.go:187–220  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

185}
186
187func TestAsyncProducerMultipleFlushes(t *testing.T) {
188 seedBroker := NewMockBroker(t, 1)
189 leader := NewMockBroker(t, 2)
190
191 metadataResponse := new(MetadataResponse)
192 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
193 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
194 seedBroker.Returns(metadataResponse)
195
196 prodSuccess := new(ProduceResponse)
197 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
198 leader.Returns(prodSuccess)
199 leader.Returns(prodSuccess)
200 leader.Returns(prodSuccess)
201
202 config := NewTestConfig()
203 config.Producer.Flush.Messages = 5
204 config.Producer.Return.Successes = true
205 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
206 if err != nil {
207 t.Fatal(err)
208 }
209
210 for range 3 {
211 for range 5 {
212 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
213 }
214 expectResults(t, producer, 5, 0)
215 }
216
217 closeProducer(t, producer)
218 leader.Close()
219 seedBroker.Close()
220}
221
222func TestAsyncProducerMultipleBrokers(t *testing.T) {
223 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 14

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
closeProducerFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70

Tested by

no test coverage detected