MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerIdempotentRetryBatchBackoff

Function TestAsyncProducerIdempotentRetryBatchBackoff

async_producer_test.go:1452–1514  ·  view source on GitHub ↗

test case for https://github.com/IBM/sarama/issues/2469: idempotent producer retries via retryBatch must honor Retry.Backoff / Retry.BackoffFunc when the broker repeatedly returns a retriable error.

(t *testing.T)

Source from the content-addressed store, hash-verified

1450// retries via retryBatch must honor Retry.Backoff / Retry.BackoffFunc when the
1451// broker repeatedly returns a retriable error.
1452func TestAsyncProducerIdempotentRetryBatchBackoff(t *testing.T) {
1453 broker := NewMockBroker(t, 1)
1454
1455 metadataResponse := &MetadataResponse{
1456 Version: 4,
1457 ControllerID: 1,
1458 }
1459 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
1460 metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
1461
1462 initProducerIDResponse := &InitProducerIDResponse{
1463 ThrottleTime: 0,
1464 ProducerID: 1000,
1465 ProducerEpoch: 1,
1466 }
1467
1468 prodNotLeaderResponse := &ProduceResponse{
1469 Version: 3,
1470 ThrottleTime: 0,
1471 }
1472 prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
1473
1474 handler := func(req *request) (res encoderWithHeader) {
1475 switch req.body.key() {
1476 case 3:
1477 return metadataResponse
1478 case 22:
1479 return initProducerIDResponse
1480 case 0:
1481 return prodNotLeaderResponse
1482 }
1483 return nil
1484 }
1485
1486 config := NewTestConfig()
1487 config.Version = V0_11_0_0
1488 config.Producer.Idempotent = true
1489 config.Net.MaxOpenRequests = 1
1490 config.Producer.Retry.Max = 3
1491 config.Producer.RequiredAcks = WaitForAll
1492 config.Producer.Return.Successes = true
1493 config.Producer.Flush.Frequency = 50 * time.Millisecond
1494
1495 backoffCalls := new(atomic.Int32)
1496 config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
1497 backoffCalls.Add(1)
1498 return 0
1499 }
1500
1501 broker.setHandler(handler)
1502 producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
1503 require.NoError(t, err)
1504
1505 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
1506
1507 expectResults(t, producer, 0, 1)
1508
1509 broker.Close()

Callers

nothing calls this directly

Calls 15

AddBrokerMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
AddTopicPartitionMethod · 0.95
AddTopicPartitionMethod · 0.95
setHandlerMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
closeProducerFunction · 0.85

Tested by

no test coverage detected