MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerRetryOrdering

Function TestAsyncProducerRetryOrdering

async_producer_test.go:2927–3049  ·  view source on GitHub ↗

TestAsyncProducerRetryOrdering verifies that message ordering is preserved during retries, both with and without request pipelining (MaxOpenRequests=1 vs >1).

(t *testing.T)

Source from the content-addressed store, hash-verified

2925// TestAsyncProducerRetryOrdering verifies that message ordering is preserved during retries,
2926// both with and without request pipelining (MaxOpenRequests=1 vs >1).
2927func TestAsyncProducerRetryOrdering(t *testing.T) {
2928 const topic = "my_topic"
2929
2930 extractValue := func(pr *ProduceRequest) string {
2931 recordsByPartition := pr.records[topic]
2932 if recordsByPartition == nil {
2933 return ""
2934 }
2935 records := recordsByPartition[0]
2936 if rb := records.RecordBatch; rb != nil && len(rb.Records) > 0 {
2937 return string(rb.Records[0].Value)
2938 }
2939 if ms := records.MsgSet; ms != nil && len(ms.Messages) > 0 {
2940 return string(ms.Messages[0].Msg.Value)
2941 }
2942 return ""
2943 }
2944
2945 tests := []struct {
2946 name string
2947 maxOpenRequests int
2948 retryBackoff time.Duration
2949 }{
2950 {
2951 name: "no pipelining (MaxOpenRequests=1)",
2952 maxOpenRequests: 1,
2953 retryBackoff: 0,
2954 },
2955 {
2956 name: "with pipelining (MaxOpenRequests=5)",
2957 maxOpenRequests: 5,
2958 retryBackoff: 50 * time.Millisecond,
2959 },
2960 }
2961
2962 for _, tt := range tests {
2963 t.Run(tt.name, func(t *testing.T) {
2964 seedBroker := NewMockBroker(t, 1)
2965 leader := NewMockBroker(t, 2)
2966
2967 metadataResponse := new(MetadataResponse)
2968 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
2969 metadataResponse.AddTopicPartition(topic, 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
2970 seedBroker.Returns(metadataResponse)
2971
2972 var (
2973 mu sync.Mutex
2974 produceAttempts int
2975 valuesSeen []string
2976 )
2977
2978 leader.setHandler(func(req *request) (res encoderWithHeader) {
2979 switch typed := req.body.(type) {
2980 case *MetadataRequest:
2981 return metadataResponse
2982 case *ProduceRequest:
2983 mu.Lock()
2984 defer mu.Unlock()

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
setHandlerMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsWithTimeoutFunction · 0.85
closeProducerFunction · 0.85
RunMethod · 0.80
AddBrokerMethod · 0.80

Tested by

no test coverage detected