(t *testing.T)
| 597 | } |
| 598 | |
| 599 | func TestFuncTxnAbortedProduce(t *testing.T) { |
| 600 | checkKafkaVersion(t, "0.11.0.0") |
| 601 | setupFunctionalTest(t) |
| 602 | defer teardownFunctionalTest(t) |
| 603 | |
| 604 | config := NewFunctionalTestConfig() |
| 605 | config.ChannelBufferSize = 20 |
| 606 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 607 | config.Producer.Flush.Messages = 200 |
| 608 | config.Producer.Idempotent = true |
| 609 | config.Producer.Transaction.ID = "TestFuncTxnAbortedProduce" |
| 610 | config.Producer.RequiredAcks = WaitForAll |
| 611 | config.Producer.Return.Successes = true |
| 612 | config.Producer.Transaction.Retry.Max = 200 |
| 613 | config.Consumer.IsolationLevel = ReadCommitted |
| 614 | config.Net.MaxOpenRequests = 1 |
| 615 | |
| 616 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 617 | require.NoError(t, err) |
| 618 | defer client.Close() |
| 619 | |
| 620 | consumer, err := NewConsumerFromClient(client) |
| 621 | require.NoError(t, err) |
| 622 | defer consumer.Close() |
| 623 | |
| 624 | pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest) |
| 625 | require.NoError(t, err) |
| 626 | msgChannel := pc.Messages() |
| 627 | defer pc.Close() |
| 628 | |
| 629 | nonTransactionalConfig := NewFunctionalTestConfig() |
| 630 | nonTransactionalConfig.Producer.Return.Successes = true |
| 631 | nonTransactionalConfig.Producer.Return.Errors = true |
| 632 | |
| 633 | nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nonTransactionalConfig) |
| 634 | require.NoError(t, err) |
| 635 | defer nonTransactionalProducer.Close() |
| 636 | |
| 637 | // Ensure consumer is started |
| 638 | nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 639 | <-msgChannel |
| 640 | |
| 641 | producer, err := NewAsyncProducerFromClient(client) |
| 642 | require.NoError(t, err) |
| 643 | defer producer.Close() |
| 644 | |
| 645 | err = producer.BeginTxn() |
| 646 | require.NoError(t, err) |
| 647 | |
| 648 | for i := 0; i < 2; i++ { |
| 649 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("transactional")} |
| 650 | } |
| 651 | |
| 652 | for i := 0; i < 2; i++ { |
| 653 | <-producer.Successes() |
| 654 | } |
| 655 | |
| 656 | err = producer.AbortTxn() |
nothing calls this directly
no test coverage detected