test case for https://github.com/IBM/sarama/issues/2469: idempotent producer retries via retryBatch must honor Retry.Backoff / Retry.BackoffFunc when the broker repeatedly returns a retriable error.
(t *testing.T)
| 1450 | // retries via retryBatch must honor Retry.Backoff / Retry.BackoffFunc when the |
| 1451 | // broker repeatedly returns a retriable error. |
| 1452 | func TestAsyncProducerIdempotentRetryBatchBackoff(t *testing.T) { |
| 1453 | broker := NewMockBroker(t, 1) |
| 1454 | |
| 1455 | metadataResponse := &MetadataResponse{ |
| 1456 | Version: 4, |
| 1457 | ControllerID: 1, |
| 1458 | } |
| 1459 | metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) |
| 1460 | metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 1461 | |
| 1462 | initProducerIDResponse := &InitProducerIDResponse{ |
| 1463 | ThrottleTime: 0, |
| 1464 | ProducerID: 1000, |
| 1465 | ProducerEpoch: 1, |
| 1466 | } |
| 1467 | |
| 1468 | prodNotLeaderResponse := &ProduceResponse{ |
| 1469 | Version: 3, |
| 1470 | ThrottleTime: 0, |
| 1471 | } |
| 1472 | prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) |
| 1473 | |
| 1474 | handler := func(req *request) (res encoderWithHeader) { |
| 1475 | switch req.body.key() { |
| 1476 | case 3: |
| 1477 | return metadataResponse |
| 1478 | case 22: |
| 1479 | return initProducerIDResponse |
| 1480 | case 0: |
| 1481 | return prodNotLeaderResponse |
| 1482 | } |
| 1483 | return nil |
| 1484 | } |
| 1485 | |
| 1486 | config := NewTestConfig() |
| 1487 | config.Version = V0_11_0_0 |
| 1488 | config.Producer.Idempotent = true |
| 1489 | config.Net.MaxOpenRequests = 1 |
| 1490 | config.Producer.Retry.Max = 3 |
| 1491 | config.Producer.RequiredAcks = WaitForAll |
| 1492 | config.Producer.Return.Successes = true |
| 1493 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 1494 | |
| 1495 | backoffCalls := new(atomic.Int32) |
| 1496 | config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration { |
| 1497 | backoffCalls.Add(1) |
| 1498 | return 0 |
| 1499 | } |
| 1500 | |
| 1501 | broker.setHandler(handler) |
| 1502 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 1503 | require.NoError(t, err) |
| 1504 | |
| 1505 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 1506 | |
| 1507 | expectResults(t, producer, 0, 1) |
| 1508 | |
| 1509 | broker.Close() |
nothing calls this directly
no test coverage detected