(t *testing.T)
| 94 | } |
| 95 | |
| 96 | func TestFuncTxnProduceNoBegin(t *testing.T) { |
| 97 | checkKafkaVersion(t, "0.11.0.0") |
| 98 | setupFunctionalTest(t) |
| 99 | defer teardownFunctionalTest(t) |
| 100 | |
| 101 | config := NewFunctionalTestConfig() |
| 102 | config.ChannelBufferSize = 20 |
| 103 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 104 | config.Producer.Flush.Messages = 200 |
| 105 | config.Producer.Idempotent = true |
| 106 | config.Producer.Transaction.ID = "TestFuncTxnProduceNoBegin" |
| 107 | config.Producer.RequiredAcks = WaitForAll |
| 108 | config.Producer.Retry.Max = 50 |
| 109 | config.Consumer.IsolationLevel = ReadCommitted |
| 110 | config.Producer.Return.Errors = true |
| 111 | config.Producer.Transaction.Retry.Max = 200 |
| 112 | config.Net.MaxOpenRequests = 1 |
| 113 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 114 | require.NoError(t, err) |
| 115 | defer producer.Close() |
| 116 | |
| 117 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 118 | producerError := <-producer.Errors() |
| 119 | require.Error(t, producerError) |
| 120 | } |
| 121 | |
| 122 | func TestFuncTxnCommitNoMessages(t *testing.T) { |
| 123 | checkKafkaVersion(t, "0.11.0.0") |
nothing calls this directly
no test coverage detected