TestAsyncProducerRemoteBrokerClosed ensures that the async producer can cleanly recover if network connectivity to the remote brokers is lost and then subsequently resumed. https://github.com/IBM/sarama/issues/2129
(t *testing.T)
| 1080 | // |
| 1081 | // https://github.com/IBM/sarama/issues/2129 |
| 1082 | func TestAsyncProducerRemoteBrokerClosed(t *testing.T) { |
| 1083 | setupFunctionalTest(t) |
| 1084 | defer teardownFunctionalTest(t) |
| 1085 | |
| 1086 | config := NewFunctionalTestConfig() |
| 1087 | config.ClientID = t.Name() |
| 1088 | config.Net.MaxOpenRequests = 1 |
| 1089 | config.Producer.Flush.MaxMessages = 1 |
| 1090 | config.Producer.Return.Successes = true |
| 1091 | config.Producer.Retry.Max = 1024 |
| 1092 | config.Producer.Retry.Backoff = time.Millisecond * 50 |
| 1093 | |
| 1094 | producer, err := NewAsyncProducer( |
| 1095 | FunctionalTestEnv.KafkaBrokerAddrs, |
| 1096 | config, |
| 1097 | ) |
| 1098 | if err != nil { |
| 1099 | t.Fatal(err) |
| 1100 | } |
| 1101 | |
| 1102 | // produce some more messages and ensure success |
| 1103 | for i := 0; i < 10; i++ { |
| 1104 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} |
| 1105 | <-producer.Successes() |
| 1106 | } |
| 1107 | |
| 1108 | // shutdown all the active tcp connections |
| 1109 | for _, proxy := range FunctionalTestEnv.Proxies { |
| 1110 | _ = proxy.Disable() |
| 1111 | } |
| 1112 | |
| 1113 | // produce some more messages |
| 1114 | for i := 10; i < 20; i++ { |
| 1115 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} |
| 1116 | } |
| 1117 | |
| 1118 | // re-open the proxies |
| 1119 | for _, proxy := range FunctionalTestEnv.Proxies { |
| 1120 | _ = proxy.Enable() |
| 1121 | } |
| 1122 | |
| 1123 | // ensure the previously produced messages succeed |
| 1124 | for i := 10; i < 20; i++ { |
| 1125 | <-producer.Successes() |
| 1126 | } |
| 1127 | |
| 1128 | // produce some more messages and ensure success |
| 1129 | for i := 20; i < 30; i++ { |
| 1130 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} |
| 1131 | <-producer.Successes() |
| 1132 | } |
| 1133 | |
| 1134 | closeProducer(t, producer) |
| 1135 | } |
| 1136 | |
| 1137 | func validateProducerMetrics(t *testing.T, client Client) { |
| 1138 | // Get the broker used by test1 topic |
nothing calls this directly
no test coverage detected