MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerBrokerBounce

Function TestAsyncProducerBrokerBounce

async_producer_test.go:476–515  ·  view source on GitHub ↗

If a Kafka broker becomes unavailable and then returns back in service, then producer reconnects to it and continues sending messages.

(t *testing.T)

Source from the content-addressed store, hash-verified

474// If a Kafka broker becomes unavailable and then returns back in service, then
475// producer reconnects to it and continues sending messages.
476func TestAsyncProducerBrokerBounce(t *testing.T) {
477 // Given
478 seedBroker := NewMockBroker(t, 1)
479 leader := NewMockBroker(t, 2)
480 leaderAddr := leader.Addr()
481
482 metadataResponse := new(MetadataResponse)
483 metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
484 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
485 seedBroker.Returns(metadataResponse)
486
487 prodSuccess := new(ProduceResponse)
488 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
489
490 config := NewTestConfig()
491 config.Producer.Flush.Messages = 1
492 config.Producer.Return.Successes = true
493 config.Producer.Retry.Backoff = 0
494 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
495 if err != nil {
496 t.Fatal(err)
497 }
498 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
499 leader.Returns(prodSuccess)
500 expectResults(t, producer, 1, 0)
501
502 // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
503 leader.Close() // producer should get EOF
504 leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
505 leader.Returns(metadataResponse) // tell it to go to broker 2 again
506
507 // Then: a produced message goes through the new broker connection.
508 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
509 leader.Returns(prodSuccess)
510 expectResults(t, producer, 1, 0)
511
512 closeProducer(t, producer)
513 seedBroker.Close()
514 leader.Close()
515}
516
517func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
518 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
NewMockBrokerAddrFunction · 0.85
closeProducerFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80

Tested by

no test coverage detected