MCPcopy
hub / github.com/IBM/sarama / TestFuncTxnProduceMultiTxn

Function TestFuncTxnProduceMultiTxn

functional_producer_test.go:516–597  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

514}
515
516func TestFuncTxnProduceMultiTxn(t *testing.T) {
517 checkKafkaVersion(t, "0.11.0.0")
518 setupFunctionalTest(t)
519 defer teardownFunctionalTest(t)
520
521 config := NewFunctionalTestConfig()
522 config.ChannelBufferSize = 20
523 config.Producer.Flush.Frequency = 50 * time.Millisecond
524 config.Producer.Flush.Messages = 200
525 config.Producer.Idempotent = true
526 config.Producer.Transaction.ID = "TestFuncTxnProduceMultiTxn"
527 config.Producer.RequiredAcks = WaitForAll
528 config.Producer.Transaction.Retry.Max = 200
529 config.Consumer.IsolationLevel = ReadCommitted
530 config.Net.MaxOpenRequests = 1
531
532 configSecond := NewFunctionalTestConfig()
533 configSecond.ChannelBufferSize = 20
534 configSecond.Producer.Flush.Frequency = 50 * time.Millisecond
535 configSecond.Producer.Flush.Messages = 200
536 configSecond.Producer.Idempotent = true
537 configSecond.Producer.Transaction.ID = "TestFuncTxnProduceMultiTxn-second"
538 configSecond.Producer.RequiredAcks = WaitForAll
539 configSecond.Producer.Retry.Max = 50
540 configSecond.Consumer.IsolationLevel = ReadCommitted
541 configSecond.Net.MaxOpenRequests = 1
542
543 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
544 require.NoError(t, err)
545 defer consumer.Close()
546
547 pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest)
548 require.NoError(t, err)
549 msgChannel := pc.Messages()
550 defer pc.Close()
551
552 nonTransactionalConfig := NewFunctionalTestConfig()
553 nonTransactionalConfig.Producer.Return.Successes = true
554 nonTransactionalConfig.Producer.Return.Errors = true
555
556 nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nonTransactionalConfig)
557 require.NoError(t, err)
558 defer nonTransactionalProducer.Close()
559
560 // Ensure consumer is started
561 nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
562 <-msgChannel
563
564 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
565 require.NoError(t, err)
566 defer producer.Close()
567
568 producerSecond, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, configSecond)
569 require.NoError(t, err)
570 defer producerSecond.Close()
571
572 err = producer.BeginTxn()
573 require.NoError(t, err)

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
BeginTxnMethod · 0.95
CommitTxnMethod · 0.95
AbortTxnMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85

Tested by

no test coverage detected