TestAsyncProducerRetryOrdering verifies that message ordering is preserved during retries, both with and without request pipelining (MaxOpenRequests=1 vs >1).
(t *testing.T)
| 2925 | // TestAsyncProducerRetryOrdering verifies that message ordering is preserved during retries, |
| 2926 | // both with and without request pipelining (MaxOpenRequests=1 vs >1). |
| 2927 | func TestAsyncProducerRetryOrdering(t *testing.T) { |
| 2928 | const topic = "my_topic" |
| 2929 | |
| 2930 | extractValue := func(pr *ProduceRequest) string { |
| 2931 | recordsByPartition := pr.records[topic] |
| 2932 | if recordsByPartition == nil { |
| 2933 | return "" |
| 2934 | } |
| 2935 | records := recordsByPartition[0] |
| 2936 | if rb := records.RecordBatch; rb != nil && len(rb.Records) > 0 { |
| 2937 | return string(rb.Records[0].Value) |
| 2938 | } |
| 2939 | if ms := records.MsgSet; ms != nil && len(ms.Messages) > 0 { |
| 2940 | return string(ms.Messages[0].Msg.Value) |
| 2941 | } |
| 2942 | return "" |
| 2943 | } |
| 2944 | |
| 2945 | tests := []struct { |
| 2946 | name string |
| 2947 | maxOpenRequests int |
| 2948 | retryBackoff time.Duration |
| 2949 | }{ |
| 2950 | { |
| 2951 | name: "no pipelining (MaxOpenRequests=1)", |
| 2952 | maxOpenRequests: 1, |
| 2953 | retryBackoff: 0, |
| 2954 | }, |
| 2955 | { |
| 2956 | name: "with pipelining (MaxOpenRequests=5)", |
| 2957 | maxOpenRequests: 5, |
| 2958 | retryBackoff: 50 * time.Millisecond, |
| 2959 | }, |
| 2960 | } |
| 2961 | |
| 2962 | for _, tt := range tests { |
| 2963 | t.Run(tt.name, func(t *testing.T) { |
| 2964 | seedBroker := NewMockBroker(t, 1) |
| 2965 | leader := NewMockBroker(t, 2) |
| 2966 | |
| 2967 | metadataResponse := new(MetadataResponse) |
| 2968 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 2969 | metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 2970 | seedBroker.Returns(metadataResponse) |
| 2971 | |
| 2972 | var ( |
| 2973 | mu sync.Mutex |
| 2974 | produceAttempts int |
| 2975 | valuesSeen []string |
| 2976 | ) |
| 2977 | |
| 2978 | leader.setHandler(func(req *request) (res encoderWithHeader) { |
| 2979 | switch typed := req.body.(type) { |
| 2980 | case *MetadataRequest: |
| 2981 | return metadataResponse |
| 2982 | case *ProduceRequest: |
| 2983 | mu.Lock() |
| 2984 | defer mu.Unlock() |
nothing calls this directly
no test coverage detected