(t *testing.T)
| 514 | } |
| 515 | |
| 516 | func TestFuncTxnProduceMultiTxn(t *testing.T) { |
| 517 | checkKafkaVersion(t, "0.11.0.0") |
| 518 | setupFunctionalTest(t) |
| 519 | defer teardownFunctionalTest(t) |
| 520 | |
| 521 | config := NewFunctionalTestConfig() |
| 522 | config.ChannelBufferSize = 20 |
| 523 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 524 | config.Producer.Flush.Messages = 200 |
| 525 | config.Producer.Idempotent = true |
| 526 | config.Producer.Transaction.ID = "TestFuncTxnProduceMultiTxn" |
| 527 | config.Producer.RequiredAcks = WaitForAll |
| 528 | config.Producer.Transaction.Retry.Max = 200 |
| 529 | config.Consumer.IsolationLevel = ReadCommitted |
| 530 | config.Net.MaxOpenRequests = 1 |
| 531 | |
| 532 | configSecond := NewFunctionalTestConfig() |
| 533 | configSecond.ChannelBufferSize = 20 |
| 534 | configSecond.Producer.Flush.Frequency = 50 * time.Millisecond |
| 535 | configSecond.Producer.Flush.Messages = 200 |
| 536 | configSecond.Producer.Idempotent = true |
| 537 | configSecond.Producer.Transaction.ID = "TestFuncTxnProduceMultiTxn-second" |
| 538 | configSecond.Producer.RequiredAcks = WaitForAll |
| 539 | configSecond.Producer.Retry.Max = 50 |
| 540 | configSecond.Consumer.IsolationLevel = ReadCommitted |
| 541 | configSecond.Net.MaxOpenRequests = 1 |
| 542 | |
| 543 | consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 544 | require.NoError(t, err) |
| 545 | defer consumer.Close() |
| 546 | |
| 547 | pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest) |
| 548 | require.NoError(t, err) |
| 549 | msgChannel := pc.Messages() |
| 550 | defer pc.Close() |
| 551 | |
| 552 | nonTransactionalConfig := NewFunctionalTestConfig() |
| 553 | nonTransactionalConfig.Producer.Return.Successes = true |
| 554 | nonTransactionalConfig.Producer.Return.Errors = true |
| 555 | |
| 556 | nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nonTransactionalConfig) |
| 557 | require.NoError(t, err) |
| 558 | defer nonTransactionalProducer.Close() |
| 559 | |
| 560 | // Ensure consumer is started |
| 561 | nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 562 | <-msgChannel |
| 563 | |
| 564 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 565 | require.NoError(t, err) |
| 566 | defer producer.Close() |
| 567 | |
| 568 | producerSecond, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, configSecond) |
| 569 | require.NoError(t, err) |
| 570 | defer producerSecond.Close() |
| 571 | |
| 572 | err = producer.BeginTxn() |
| 573 | require.NoError(t, err) |
nothing calls this directly
no test coverage detected