MCPcopy
hub / github.com/IBM/sarama / TestFuncTxnProduce

Function TestFuncTxnProduce

functional_producer_test.go:156–207  ·  functional_producer_test.go::TestFuncTxnProduce
(t *testing.T)

Source from the content-addressed store, hash-verified

154}
155
156func TestFuncTxnProduce(t *testing.T) {
157 checkKafkaVersion(t, "0.11.0.0")
158 setupFunctionalTest(t)
159 defer teardownFunctionalTest(t)
160
161 config := NewFunctionalTestConfig()
162 config.ChannelBufferSize = 20
163 config.Producer.Flush.Frequency = 50 * time.Millisecond
164 config.Producer.Flush.Messages = 200
165 config.Producer.Idempotent = true
166 config.Producer.Transaction.ID = "TestFuncTxnProduce"
167 config.Producer.RequiredAcks = WaitForAll
168 config.Producer.Transaction.Retry.Max = 200
169 config.Consumer.IsolationLevel = ReadCommitted
170 config.Net.MaxOpenRequests = 1
171
172 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
173 require.NoError(t, err)
174 defer consumer.Close()
175
176 pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest)
177 require.NoError(t, err)
178 msgChannel := pc.Messages()
179 defer pc.Close()
180
181 nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())
182 require.NoError(t, err)
183 defer nonTransactionalProducer.Close()
184
185 // Ensure consumer is started
186 nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
187 <-msgChannel
188
189 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
190 require.NoError(t, err)
191 defer producer.Close()
192
193 err = producer.BeginTxn()
194 require.NoError(t, err)
195
196 for i := 0; i < 1; i++ {
197 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
198 }
199
200 err = producer.CommitTxn()
201 require.NoError(t, err)
202
203 for i := 0; i < 1; i++ {
204 msg := <-msgChannel
205 t.Logf("Received %s from %s-%d at offset %d", msg.Value, msg.Topic, msg.Partition, msg.Offset)
206 }
207}
208
209func TestFuncTxnProduceWithBrokerFailure(t *testing.T) {
210 checkKafkaVersion(t, "0.11.0.0")

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
BeginTxnMethod · 0.95
CommitTxnMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
NewConsumerFunction · 0.70

Tested by

no test coverage detected