(t *testing.T)
| 797 | } |
| 798 | |
| 799 | func TestAsyncProducerBrokerRestart(t *testing.T) { |
| 800 | // Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) |
| 801 | |
| 802 | seedBroker := NewMockBroker(t, 1) |
| 803 | leader := NewMockBroker(t, 2) |
| 804 | |
| 805 | var leaderLock sync.Mutex |
| 806 | metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) { |
| 807 | leaderLock.Lock() |
| 808 | defer leaderLock.Unlock() |
| 809 | metadataLeader := new(MetadataResponse) |
| 810 | metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) |
| 811 | metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 812 | return metadataLeader |
| 813 | } |
| 814 | |
| 815 | // The seed broker only handles Metadata request in bootstrap |
| 816 | seedBroker.setHandler(metadataRequestHandlerFunc) |
| 817 | |
| 818 | var emptyValues atomic.Int32 |
| 819 | |
| 820 | countRecordsWithEmptyValue := func(req *request) { |
| 821 | preq := req.body.(*ProduceRequest) |
| 822 | if batch := preq.records["my_topic"][0].RecordBatch; batch != nil { |
| 823 | for _, record := range batch.Records { |
| 824 | if len(record.Value) == 0 { |
| 825 | emptyValues.Add(1) |
| 826 | } |
| 827 | } |
| 828 | } |
| 829 | if batch := preq.records["my_topic"][0].MsgSet; batch != nil { |
| 830 | for _, record := range batch.Messages { |
| 831 | if len(record.Msg.Value) == 0 { |
| 832 | emptyValues.Add(1) |
| 833 | } |
| 834 | } |
| 835 | } |
| 836 | } |
| 837 | |
| 838 | failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { |
| 839 | countRecordsWithEmptyValue(req) |
| 840 | |
| 841 | time.Sleep(50 * time.Millisecond) |
| 842 | |
| 843 | prodSuccess := new(ProduceResponse) |
| 844 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) |
| 845 | return prodSuccess |
| 846 | } |
| 847 | |
| 848 | succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) { |
| 849 | countRecordsWithEmptyValue(req) |
| 850 | |
| 851 | prodSuccess := new(ProduceResponse) |
| 852 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 853 | return prodSuccess |
| 854 | } |
| 855 | |
| 856 | leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
nothing calls this directly
no test coverage detected