MCPcopy
hub / github.com/IBM/sarama / TestFuncIdempotentBufferedSequence

Function TestFuncIdempotentBufferedSequence

functional_producer_test.go:765–867  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

763}
764
765func TestFuncIdempotentBufferedSequence(t *testing.T) {
766 checkKafkaVersion(t, "0.11.0.0")
767 setupFunctionalTest(t)
768 defer teardownFunctionalTest(t)
769
770 const (
771 topic = "test.1"
772 partition int32 = 0
773 )
774
775 cfg := NewFunctionalTestConfig()
776 cfg.Net.MaxOpenRequests = 1
777 cfg.Producer.Idempotent = true
778 cfg.Producer.RequiredAcks = WaitForAll
779 cfg.Producer.Return.Successes = true
780 cfg.Producer.Return.Errors = true
781 cfg.Producer.Retry.Max = 64
782 cfg.Producer.Retry.Backoff = 250 * time.Millisecond
783
784 start := time.Now()
785
786 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, cfg)
787 require.NoError(t, err)
788 defer producer.Close()
789
790 asyncProd, ok := producer.(*asyncProducer)
791 require.True(t, ok)
792
793 waitForMessages := func(count int) {
794 timeout := time.After(2 * time.Minute)
795 for count > 0 {
796 select {
797 case <-timeout:
798 t.Fatalf("timed out waiting for %d messages", count)
799 case perr := <-producer.Errors():
800 if perr != nil {
801 t.Logf("producer error: %v", perr.Err)
802 }
803 count--
804 case <-producer.Successes():
805 count--
806 }
807 }
808 }
809
810 for i := 0; i < 5; i++ {
811 producer.Input() <- &ProducerMessage{
812 Topic: topic,
813 Partition: partition,
814 Value: StringEncoder(fmt.Sprintf("warmup-%d", i)),
815 }
816 }
817 waitForMessages(5)
818
819 leader, err := asyncProd.client.Leader(topic, partition)
820 require.NoError(t, err)
821
822 bp := asyncProd.getBrokerProducer(leader)

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ErrorsMethod · 0.95
SuccessesMethod · 0.95
InputMethod · 0.95
flushRetryBuffersMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
FatalfMethod · 0.80
getBrokerProducerMethod · 0.80

Tested by

no test coverage detected