(t *testing.T)
| 154 | } |
| 155 | |
| 156 | func TestFuncTxnProduce(t *testing.T) { |
| 157 | checkKafkaVersion(t, "0.11.0.0") |
| 158 | setupFunctionalTest(t) |
| 159 | defer teardownFunctionalTest(t) |
| 160 | |
| 161 | config := NewFunctionalTestConfig() |
| 162 | config.ChannelBufferSize = 20 |
| 163 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 164 | config.Producer.Flush.Messages = 200 |
| 165 | config.Producer.Idempotent = true |
| 166 | config.Producer.Transaction.ID = "TestFuncTxnProduce" |
| 167 | config.Producer.RequiredAcks = WaitForAll |
| 168 | config.Producer.Transaction.Retry.Max = 200 |
| 169 | config.Consumer.IsolationLevel = ReadCommitted |
| 170 | config.Net.MaxOpenRequests = 1 |
| 171 | |
| 172 | consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 173 | require.NoError(t, err) |
| 174 | defer consumer.Close() |
| 175 | |
| 176 | pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest) |
| 177 | require.NoError(t, err) |
| 178 | msgChannel := pc.Messages() |
| 179 | defer pc.Close() |
| 180 | |
| 181 | nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) |
| 182 | require.NoError(t, err) |
| 183 | defer nonTransactionalProducer.Close() |
| 184 | |
| 185 | // Ensure consumer is started |
| 186 | nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 187 | <-msgChannel |
| 188 | |
| 189 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 190 | require.NoError(t, err) |
| 191 | defer producer.Close() |
| 192 | |
| 193 | err = producer.BeginTxn() |
| 194 | require.NoError(t, err) |
| 195 | |
| 196 | for i := 0; i < 1; i++ { |
| 197 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")} |
| 198 | } |
| 199 | |
| 200 | err = producer.CommitTxn() |
| 201 | require.NoError(t, err) |
| 202 | |
| 203 | for i := 0; i < 1; i++ { |
| 204 | msg := <-msgChannel |
| 205 | t.Logf("Received %s from %s-%d at offset %d", msg.Value, msg.Topic, msg.Partition, msg.Offset) |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | func TestFuncTxnProduceWithBrokerFailure(t *testing.T) { |
| 210 | checkKafkaVersion(t, "0.11.0.0") |
nothing calls this directly
no test coverage detected