MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerIdempotentRetryCheckBatch_2378

Function TestAsyncProducerIdempotentRetryCheckBatch_2378

async_producer_test.go:1381–1447  ·  view source on GitHub ↗

test case for https://github.com/IBM/sarama/pull/2378

(t *testing.T)

Source from the content-addressed store, hash-verified

1379
1380// test case for https://github.com/IBM/sarama/pull/2378
1381func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) {
1382 broker := NewMockBroker(t, 1)
1383
1384 metadataResponse := &MetadataResponse{
1385 Version: 4,
1386 ControllerID: 1,
1387 }
1388 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
1389 metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
1390
1391 initProducerIDResponse := &InitProducerIDResponse{
1392 ThrottleTime: 0,
1393 ProducerID: 1000,
1394 ProducerEpoch: 1,
1395 }
1396
1397 prodNotLeaderResponse := &ProduceResponse{
1398 Version: 3,
1399 ThrottleTime: 0,
1400 }
1401 prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
1402
1403 handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) {
1404 switch req.body.key() {
1405 case 3:
1406 return metadataResponse
1407 case 22:
1408 return initProducerIDResponse
1409 case 0: // for msg, always return error to trigger retryBatch
1410 return prodNotLeaderResponse
1411 }
1412 return nil
1413 }
1414
1415 config := NewTestConfig()
1416 config.Version = V0_11_0_0
1417 config.Producer.Idempotent = true
1418 config.Net.MaxOpenRequests = 1
1419 config.Producer.Retry.Max = 1 // set max retry to 1
1420 config.Producer.RequiredAcks = WaitForAll
1421 config.Producer.Return.Successes = true
1422 config.Producer.Flush.Frequency = 50 * time.Millisecond
1423 config.Producer.Retry.Backoff = 100 * time.Millisecond
1424
1425 broker.setHandler(handlerFailBeforeWrite)
1426 producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
1427 if err != nil {
1428 t.Fatal(err)
1429 }
1430
1431 for range 3 {
1432 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
1433 }
1434
1435 go func() {
1436 for range 7 {
1437 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
1438 time.Sleep(100 * time.Millisecond)

Callers

nothing calls this directly

Calls 15

AddBrokerMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
AddTopicPartitionMethod · 0.95
AddTopicPartitionMethod · 0.95
setHandlerMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
closeProducerFunction · 0.85

Tested by

no test coverage detected