(t *testing.T)
| 2735 | } |
| 2736 | |
| 2737 | func TestProducerRetryBufferLimits(t *testing.T) { |
| 2738 | broker := NewMockBroker(t, 1) |
| 2739 | defer broker.Close() |
| 2740 | topic := "test-topic" |
| 2741 | |
| 2742 | metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) { |
| 2743 | r := new(MetadataResponse) |
| 2744 | r.AddBroker(broker.Addr(), broker.BrokerID()) |
| 2745 | r.AddTopicPartition(topic, 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 2746 | return r |
| 2747 | } |
| 2748 | |
| 2749 | produceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { |
| 2750 | r := new(ProduceResponse) |
| 2751 | r.AddTopicPartition(topic, 0, ErrNotLeaderForPartition) |
| 2752 | return r |
| 2753 | } |
| 2754 | |
| 2755 | broker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
| 2756 | "ProduceRequest": produceRequestHandlerFunc, |
| 2757 | "MetadataRequest": metadataRequestHandlerFunc, |
| 2758 | }) |
| 2759 | |
| 2760 | tests := []struct { |
| 2761 | name string |
| 2762 | configureBuffer func(*Config) |
| 2763 | messageSize int |
| 2764 | numMessages int |
| 2765 | }{ |
| 2766 | { |
| 2767 | name: "MaxBufferLength", |
| 2768 | configureBuffer: func(config *Config) { |
| 2769 | config.Producer.Flush.MaxMessages = 1 |
| 2770 | config.Producer.Retry.MaxBufferLength = minFunctionalRetryBufferLength |
| 2771 | }, |
| 2772 | messageSize: 1, // Small message size |
| 2773 | numMessages: 10000, |
| 2774 | }, |
| 2775 | { |
| 2776 | name: "MaxBufferBytes", |
| 2777 | configureBuffer: func(config *Config) { |
| 2778 | config.Producer.Flush.MaxMessages = 1 |
| 2779 | config.Producer.Retry.MaxBufferBytes = minFunctionalRetryBufferBytes |
| 2780 | }, |
| 2781 | messageSize: 950 * 1024, // 950 KB |
| 2782 | numMessages: 1000, |
| 2783 | }, |
| 2784 | } |
| 2785 | |
| 2786 | for _, tt := range tests { |
| 2787 | t.Run(tt.name, func(t *testing.T) { |
| 2788 | config := NewTestConfig() |
| 2789 | config.Producer.Return.Successes = true |
| 2790 | tt.configureBuffer(config) |
| 2791 | |
| 2792 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 2793 | if err != nil { |
| 2794 | t.Fatal(err) |
nothing calls this directly
no test coverage detected