(t *testing.T)
| 207 | } |
| 208 | |
| 209 | func TestFuncTxnProduceWithBrokerFailure(t *testing.T) { |
| 210 | checkKafkaVersion(t, "0.11.0.0") |
| 211 | setupFunctionalTest(t) |
| 212 | defer teardownFunctionalTest(t) |
| 213 | |
| 214 | config := NewFunctionalTestConfig() |
| 215 | config.ChannelBufferSize = 20 |
| 216 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 217 | config.Producer.Flush.Messages = 200 |
| 218 | config.Producer.Idempotent = true |
| 219 | config.Producer.Transaction.ID = "TestFuncTxnProduceWithBrokerFailure" |
| 220 | config.Producer.RequiredAcks = WaitForAll |
| 221 | config.Producer.Transaction.Retry.Max = 200 |
| 222 | config.Consumer.IsolationLevel = ReadCommitted |
| 223 | config.Net.MaxOpenRequests = 1 |
| 224 | |
| 225 | consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 226 | require.NoError(t, err) |
| 227 | defer consumer.Close() |
| 228 | |
| 229 | pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest) |
| 230 | require.NoError(t, err) |
| 231 | msgChannel := pc.Messages() |
| 232 | defer pc.Close() |
| 233 | |
| 234 | nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) |
| 235 | require.NoError(t, err) |
| 236 | defer nonTransactionalProducer.Close() |
| 237 | |
| 238 | // Ensure consumer is started |
| 239 | nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 240 | <-msgChannel |
| 241 | |
| 242 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 243 | require.NoError(t, err) |
| 244 | defer producer.Close() |
| 245 | |
| 246 | txCoordinator, _ := producer.(*asyncProducer).client.TransactionCoordinator(config.Producer.Transaction.ID) |
| 247 | |
| 248 | err = producer.BeginTxn() |
| 249 | require.NoError(t, err) |
| 250 | |
| 251 | defer func() { |
| 252 | if err := startDockerTestBroker(context.Background(), txCoordinator.id); err != nil { |
| 253 | t.Fatal(err) |
| 254 | } |
| 255 | t.Logf("\n") |
| 256 | }() |
| 257 | if err := stopDockerTestBroker(context.Background(), txCoordinator.id); err != nil { |
| 258 | t.Fatal(err) |
| 259 | } |
| 260 | |
| 261 | for i := 0; i < 1; i++ { |
| 262 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 263 | } |
| 264 | |
| 265 | err = producer.CommitTxn() |
| 266 | require.NoError(t, err) |
nothing calls this directly
no test coverage detected