(t *testing.T)
| 763 | } |
| 764 | |
| 765 | func TestFuncIdempotentBufferedSequence(t *testing.T) { |
| 766 | checkKafkaVersion(t, "0.11.0.0") |
| 767 | setupFunctionalTest(t) |
| 768 | defer teardownFunctionalTest(t) |
| 769 | |
| 770 | const ( |
| 771 | topic = "test.1" |
| 772 | partition int32 = 0 |
| 773 | ) |
| 774 | |
| 775 | cfg := NewFunctionalTestConfig() |
| 776 | cfg.Net.MaxOpenRequests = 1 |
| 777 | cfg.Producer.Idempotent = true |
| 778 | cfg.Producer.RequiredAcks = WaitForAll |
| 779 | cfg.Producer.Return.Successes = true |
| 780 | cfg.Producer.Return.Errors = true |
| 781 | cfg.Producer.Retry.Max = 64 |
| 782 | cfg.Producer.Retry.Backoff = 250 * time.Millisecond |
| 783 | |
| 784 | start := time.Now() |
| 785 | |
| 786 | producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, cfg) |
| 787 | require.NoError(t, err) |
| 788 | defer producer.Close() |
| 789 | |
| 790 | asyncProd, ok := producer.(*asyncProducer) |
| 791 | require.True(t, ok) |
| 792 | |
| 793 | waitForMessages := func(count int) { |
| 794 | timeout := time.After(2 * time.Minute) |
| 795 | for count > 0 { |
| 796 | select { |
| 797 | case <-timeout: |
| 798 | t.Fatalf("timed out waiting for %d messages", count) |
| 799 | case perr := <-producer.Errors(): |
| 800 | if perr != nil { |
| 801 | t.Logf("producer error: %v", perr.Err) |
| 802 | } |
| 803 | count-- |
| 804 | case <-producer.Successes(): |
| 805 | count-- |
| 806 | } |
| 807 | } |
| 808 | } |
| 809 | |
| 810 | for i := 0; i < 5; i++ { |
| 811 | producer.Input() <- &ProducerMessage{ |
| 812 | Topic: topic, |
| 813 | Partition: partition, |
| 814 | Value: StringEncoder(fmt.Sprintf("warmup-%d", i)), |
| 815 | } |
| 816 | } |
| 817 | waitForMessages(5) |
| 818 | |
| 819 | leader, err := asyncProd.client.Leader(topic, partition) |
| 820 | require.NoError(t, err) |
| 821 | |
| 822 | bp := asyncProd.getBrokerProducer(leader) |
nothing calls this directly
no test coverage detected