MCPcopy
hub / github.com/IBM/sarama / TestFuncTxnProduceWithBrokerFailure

Function TestFuncTxnProduceWithBrokerFailure

functional_producer_test.go:209–272  ·  functional_producer_test.go::TestFuncTxnProduceWithBrokerFailure
(t *testing.T)

Source from the content-addressed store, hash-verified

207}
208
209func TestFuncTxnProduceWithBrokerFailure(t *testing.T) {
210 checkKafkaVersion(t, "0.11.0.0")
211 setupFunctionalTest(t)
212 defer teardownFunctionalTest(t)
213
214 config := NewFunctionalTestConfig()
215 config.ChannelBufferSize = 20
216 config.Producer.Flush.Frequency = 50 * time.Millisecond
217 config.Producer.Flush.Messages = 200
218 config.Producer.Idempotent = true
219 config.Producer.Transaction.ID = "TestFuncTxnProduceWithBrokerFailure"
220 config.Producer.RequiredAcks = WaitForAll
221 config.Producer.Transaction.Retry.Max = 200
222 config.Consumer.IsolationLevel = ReadCommitted
223 config.Net.MaxOpenRequests = 1
224
225 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
226 require.NoError(t, err)
227 defer consumer.Close()
228
229 pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest)
230 require.NoError(t, err)
231 msgChannel := pc.Messages()
232 defer pc.Close()
233
234 nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())
235 require.NoError(t, err)
236 defer nonTransactionalProducer.Close()
237
238 // Ensure consumer is started
239 nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
240 <-msgChannel
241
242 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
243 require.NoError(t, err)
244 defer producer.Close()
245
246 txCoordinator, _ := producer.(*asyncProducer).client.TransactionCoordinator(config.Producer.Transaction.ID)
247
248 err = producer.BeginTxn()
249 require.NoError(t, err)
250
251 defer func() {
252 if err := startDockerTestBroker(context.Background(), txCoordinator.id); err != nil {
253 t.Fatal(err)
254 }
255 t.Logf("\n")
256 }()
257 if err := stopDockerTestBroker(context.Background(), txCoordinator.id); err != nil {
258 t.Fatal(err)
259 }
260
261 for i := 0; i < 1; i++ {
262 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
263 }
264
265 err = producer.CommitTxn()
266 require.NoError(t, err)

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
BeginTxnMethod · 0.95
CommitTxnMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
startDockerTestBrokerFunction · 0.85

Tested by

no test coverage detected