MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerRemoteBrokerClosed

Function TestAsyncProducerRemoteBrokerClosed

functional_producer_test.go:1082–1135  ·  view source on GitHub ↗

TestAsyncProducerRemoteBrokerClosed ensures that the async producer can cleanly recover if network connectivity to the remote brokers is lost and then subsequently resumed. https://github.com/IBM/sarama/issues/2129

(t *testing.T)

Source from the content-addressed store, hash-verified

1080//
1081// https://github.com/IBM/sarama/issues/2129
1082func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
1083 setupFunctionalTest(t)
1084 defer teardownFunctionalTest(t)
1085
1086 config := NewFunctionalTestConfig()
1087 config.ClientID = t.Name()
1088 config.Net.MaxOpenRequests = 1
1089 config.Producer.Flush.MaxMessages = 1
1090 config.Producer.Return.Successes = true
1091 config.Producer.Retry.Max = 1024
1092 config.Producer.Retry.Backoff = time.Millisecond * 50
1093
1094 producer, err := NewAsyncProducer(
1095 FunctionalTestEnv.KafkaBrokerAddrs,
1096 config,
1097 )
1098 if err != nil {
1099 t.Fatal(err)
1100 }
1101
1102 // produce some more messages and ensure success
1103 for i := 0; i < 10; i++ {
1104 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
1105 <-producer.Successes()
1106 }
1107
1108 // shutdown all the active tcp connections
1109 for _, proxy := range FunctionalTestEnv.Proxies {
1110 _ = proxy.Disable()
1111 }
1112
1113 // produce some more messages
1114 for i := 10; i < 20; i++ {
1115 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
1116 }
1117
1118 // re-open the proxies
1119 for _, proxy := range FunctionalTestEnv.Proxies {
1120 _ = proxy.Enable()
1121 }
1122
1123 // ensure the previously produced messages succeed
1124 for i := 10; i < 20; i++ {
1125 <-producer.Successes()
1126 }
1127
1128 // produce some more messages and ensure success
1129 for i := 20; i < 30; i++ {
1130 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
1131 <-producer.Successes()
1132 }
1133
1134 closeProducer(t, producer)
1135}
1136
1137func validateProducerMetrics(t *testing.T, client Client) {
1138 // Get the broker used by test1 topic

Callers

nothing calls this directly

Calls 12

InputMethod · 0.95
SuccessesMethod · 0.95
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
closeProducerFunction · 0.85
FatalMethod · 0.80
DisableMethod · 0.80
EnableMethod · 0.80
NewAsyncProducerFunction · 0.70
NameMethod · 0.65

Tested by

no test coverage detected