MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerFailureRetry

Function TestAsyncProducerFailureRetry

async_producer_test.go:304–350  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

302}
303
304func TestAsyncProducerFailureRetry(t *testing.T) {
305 seedBroker := NewMockBroker(t, 1)
306 leader1 := NewMockBroker(t, 2)
307 leader2 := NewMockBroker(t, 3)
308
309 metadataLeader1 := new(MetadataResponse)
310 metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
311 metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
312 seedBroker.Returns(metadataLeader1)
313
314 config := NewTestConfig()
315 config.Producer.Flush.Messages = 10
316 config.Producer.Return.Successes = true
317 config.Producer.Retry.Backoff = 0
318 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
319 if err != nil {
320 t.Fatal(err)
321 }
322 seedBroker.Close()
323
324 for range 10 {
325 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
326 }
327 prodNotLeader := new(ProduceResponse)
328 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
329 leader1.Returns(prodNotLeader)
330
331 metadataLeader2 := new(MetadataResponse)
332 metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
333 metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
334 leader1.Returns(metadataLeader2)
335
336 prodSuccess := new(ProduceResponse)
337 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
338 leader2.Returns(prodSuccess)
339 expectResults(t, producer, 10, 0)
340 leader1.Close()
341
342 for range 10 {
343 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
344 }
345 leader2.Returns(prodSuccess)
346 expectResults(t, producer, 10, 0)
347
348 leader2.Close()
349 closeProducer(t, producer)
350}
351
352func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
353 tt := func(t *testing.T, kErr KError) {

Callers

nothing calls this directly

Calls 14

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
closeProducerFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70

Tested by

no test coverage detected