MCPcopy
hub / github.com/IBM/sarama / TestFuncProducingIdempotentWithBrokerFailure

Function TestFuncProducingIdempotentWithBrokerFailure

functional_producer_test.go:689–763  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

687}
688
689func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
690 setupFunctionalTest(t)
691 defer teardownFunctionalTest(t)
692
693 config := NewFunctionalTestConfig()
694 config.Producer.Flush.Frequency = 250 * time.Millisecond
695 config.Producer.Idempotent = true
696 config.Producer.Timeout = 500 * time.Millisecond
697 config.Producer.Retry.Max = 1
698 config.Producer.Retry.Backoff = 500 * time.Millisecond
699 config.Producer.Return.Successes = true
700 config.Producer.Return.Errors = true
701 config.Producer.RequiredAcks = WaitForAll
702 config.Net.MaxOpenRequests = 1
703
704 producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
705 if err != nil {
706 t.Fatal(err)
707 }
708 defer safeClose(t, producer)
709
710 // Successfully publish a few messages
711 for i := 0; i < 10; i++ {
712 _, _, err = producer.SendMessage(&ProducerMessage{
713 Topic: "test.1",
714 Value: StringEncoder(fmt.Sprintf("%d message", i)),
715 })
716 if err != nil {
717 t.Fatal(err)
718 }
719 }
720
721 // break the brokers.
722 for proxyName, proxy := range FunctionalTestEnv.Proxies {
723 if !strings.Contains(proxyName, "kafka") {
724 continue
725 }
726 if err := proxy.Disable(); err != nil {
727 t.Fatal(err)
728 }
729 }
730
731 // This should fail hard now
732 for i := 10; i < 20; i++ {
733 _, _, err = producer.SendMessage(&ProducerMessage{
734 Topic: "test.1",
735 Value: StringEncoder(fmt.Sprintf("%d message", i)),
736 })
737 if err == nil {
738 t.Fatal(err)
739 }
740 }
741
742 // Now bring the proxy back up
743 for proxyName, proxy := range FunctionalTestEnv.Proxies {
744 if !strings.Contains(proxyName, "kafka") {
745 continue
746 }

Callers

nothing calls this directly

Calls 10

SendMessageMethod · 0.95
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
FatalMethod · 0.80
DisableMethod · 0.80
EnableMethod · 0.80
NewSyncProducerFunction · 0.70
safeCloseFunction · 0.70

Tested by

no test coverage detected