MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerBrokerRestart

Function TestAsyncProducerBrokerRestart

async_producer_test.go:799–914  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

797}
798
799func TestAsyncProducerBrokerRestart(t *testing.T) {
800 // Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
801
802 seedBroker := NewMockBroker(t, 1)
803 leader := NewMockBroker(t, 2)
804
805 var leaderLock sync.Mutex
806 metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
807 leaderLock.Lock()
808 defer leaderLock.Unlock()
809 metadataLeader := new(MetadataResponse)
810 metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
811 metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
812 return metadataLeader
813 }
814
815 // The seed broker only handles Metadata request in bootstrap
816 seedBroker.setHandler(metadataRequestHandlerFunc)
817
818 var emptyValues atomic.Int32
819
820 countRecordsWithEmptyValue := func(req *request) {
821 preq := req.body.(*ProduceRequest)
822 if batch := preq.records["my_topic"][0].RecordBatch; batch != nil {
823 for _, record := range batch.Records {
824 if len(record.Value) == 0 {
825 emptyValues.Add(1)
826 }
827 }
828 }
829 if batch := preq.records["my_topic"][0].MsgSet; batch != nil {
830 for _, record := range batch.Messages {
831 if len(record.Msg.Value) == 0 {
832 emptyValues.Add(1)
833 }
834 }
835 }
836 }
837
838 failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
839 countRecordsWithEmptyValue(req)
840
841 time.Sleep(50 * time.Millisecond)
842
843 prodSuccess := new(ProduceResponse)
844 prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
845 return prodSuccess
846 }
847
848 succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
849 countRecordsWithEmptyValue(req)
850
851 prodSuccess := new(ProduceResponse)
852 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
853 return prodSuccess
854 }
855
856 leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
setHandlerMethod · 0.95
SetHandlerFuncByMapMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsWithTimeoutFunction · 0.85
closeProducerWithTimeoutFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80

Tested by

no test coverage detected