(t *testing.T)
| 272 | } |
| 273 | |
| 274 | func TestFuncTxnProduceEpochBump(t *testing.T) { |
| 275 | checkKafkaVersion(t, "2.6.0") |
| 276 | setupFunctionalTest(t) |
| 277 | defer teardownFunctionalTest(t) |
| 278 | |
| 279 | config := NewFunctionalTestConfig() |
| 280 | config.ChannelBufferSize = 20 |
| 281 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 282 | config.Producer.Flush.Messages = 200 |
| 283 | config.Producer.Idempotent = true |
| 284 | config.Producer.Transaction.ID = "TestFuncTxnProduceEpochBump" |
| 285 | config.Producer.RequiredAcks = WaitForAll |
| 286 | config.Producer.Transaction.Retry.Max = 200 |
| 287 | config.Consumer.IsolationLevel = ReadCommitted |
| 288 | config.Net.MaxOpenRequests = 1 |
| 289 | |
| 290 | consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 291 | require.NoError(t, err) |
| 292 | defer consumer.Close() |
| 293 | |
| 294 | pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest) |
| 295 | require.NoError(t, err) |
| 296 | msgChannel := pc.Messages() |
| 297 | defer pc.Close() |
| 298 | |
| 299 | nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) |
| 300 | require.NoError(t, err) |
| 301 | defer nonTransactionalProducer.Close() |
| 302 | |
| 303 | // Ensure consumer is started |
| 304 | nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 305 | <-msgChannel |
| 306 | |
| 307 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 308 | require.NoError(t, err) |
| 309 | defer producer.Close() |
| 310 | |
| 311 | err = producer.BeginTxn() |
| 312 | require.NoError(t, err) |
| 313 | |
| 314 | for i := 0; i < 1; i++ { |
| 315 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 316 | } |
| 317 | |
| 318 | err = producer.CommitTxn() |
| 319 | require.NoError(t, err) |
| 320 | |
| 321 | for i := 0; i < 1; i++ { |
| 322 | msg := <-msgChannel |
| 323 | t.Logf("Received %s from %s-%d at offset %d", msg.Value, msg.Topic, msg.Partition, msg.Offset) |
| 324 | } |
| 325 | |
| 326 | err = producer.BeginTxn() |
| 327 | require.NoError(t, err) |
| 328 | |
| 329 | for i := 0; i < 1; i++ { |
| 330 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 331 | } |
nothing calls this directly
no test coverage detected