MCPcopy
hub / github.com/IBM/sarama / TestFuncTxnAbortedProduce

Function TestFuncTxnAbortedProduce

functional_producer_test.go:599–669  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

597}
598
599func TestFuncTxnAbortedProduce(t *testing.T) {
600 checkKafkaVersion(t, "0.11.0.0")
601 setupFunctionalTest(t)
602 defer teardownFunctionalTest(t)
603
604 config := NewFunctionalTestConfig()
605 config.ChannelBufferSize = 20
606 config.Producer.Flush.Frequency = 50 * time.Millisecond
607 config.Producer.Flush.Messages = 200
608 config.Producer.Idempotent = true
609 config.Producer.Transaction.ID = "TestFuncTxnAbortedProduce"
610 config.Producer.RequiredAcks = WaitForAll
611 config.Producer.Return.Successes = true
612 config.Producer.Transaction.Retry.Max = 200
613 config.Consumer.IsolationLevel = ReadCommitted
614 config.Net.MaxOpenRequests = 1
615
616 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
617 require.NoError(t, err)
618 defer client.Close()
619
620 consumer, err := NewConsumerFromClient(client)
621 require.NoError(t, err)
622 defer consumer.Close()
623
624 pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest)
625 require.NoError(t, err)
626 msgChannel := pc.Messages()
627 defer pc.Close()
628
629 nonTransactionalConfig := NewFunctionalTestConfig()
630 nonTransactionalConfig.Producer.Return.Successes = true
631 nonTransactionalConfig.Producer.Return.Errors = true
632
633 nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nonTransactionalConfig)
634 require.NoError(t, err)
635 defer nonTransactionalProducer.Close()
636
637 // Ensure consumer is started
638 nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
639 <-msgChannel
640
641 producer, err := NewAsyncProducerFromClient(client)
642 require.NoError(t, err)
643 defer producer.Close()
644
645 err = producer.BeginTxn()
646 require.NoError(t, err)
647
648 for i := 0; i < 2; i++ {
649 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("transactional")}
650 }
651
652 for i := 0; i < 2; i++ {
653 <-producer.Successes()
654 }
655
656 err = producer.AbortTxn()

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
SuccessesMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
NewConsumerFromClientFunction · 0.85
StringEncoderTypeAlias · 0.85
NewClientFunction · 0.70

Tested by

no test coverage detected