MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerIdempotentEpochExhaustion

Function TestAsyncProducerIdempotentEpochExhaustion

async_producer_test.go:1636–1709  ·  view source on GitHub ↗

TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests a new producerID when producerEpoch is exhausted

(t *testing.T)

Source from the content-addressed store, hash-verified

1634// TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests
1635// a new producerID when producerEpoch is exhausted
1636func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) {
1637 broker := NewMockBroker(t, 1)
1638 defer broker.Close()
1639
1640 var (
1641 initialProducerID = int64(1000)
1642 newProducerID = initialProducerID + 1
1643 )
1644
1645 metadataResponse := &MetadataResponse{
1646 Version: 4,
1647 ControllerID: 1,
1648 }
1649 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
1650 metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
1651 broker.Returns(metadataResponse)
1652
1653 initProducerID := &InitProducerIDResponse{
1654 ThrottleTime: 0,
1655 ProducerID: initialProducerID,
1656 ProducerEpoch: math.MaxInt16, // Mock ProducerEpoch at the exhaustion point
1657 }
1658 broker.Returns(initProducerID)
1659
1660 config := NewTestConfig()
1661 config.Producer.Flush.Messages = 10
1662 config.Producer.Flush.Frequency = 10 * time.Millisecond
1663 config.Producer.Return.Successes = true
1664 config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
1665 config.Producer.RequiredAcks = WaitForAll
1666 config.Producer.Retry.Backoff = 0
1667 config.Producer.Idempotent = true
1668 config.Net.MaxOpenRequests = 1
1669 config.Version = V0_11_0_0
1670
1671 producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
1672 if err != nil {
1673 t.Fatal(err)
1674 }
1675 defer closeProducer(t, producer)
1676
1677 producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
1678 prodError := &ProduceResponse{
1679 Version: 3,
1680 ThrottleTime: 0,
1681 }
1682 prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
1683 broker.Returns(prodError)
1684 broker.Returns(&InitProducerIDResponse{
1685 ProducerID: newProducerID,
1686 })
1687
1688 <-producer.Errors()
1689
1690 lastProduceReqRes := broker.history[len(broker.history)-2] // last is InitProducerIDRequest
1691 lastProduceBatch := lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
1692 if lastProduceBatch.FirstSequence != 0 {
1693 t.Error("first sequence not zero")

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
AddBrokerMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
AddTopicPartitionMethod · 0.95
ReturnsMethod · 0.95
InputMethod · 0.95
AddTopicPartitionMethod · 0.95
ErrorsMethod · 0.95
NewMockBrokerFunction · 0.85
closeProducerFunction · 0.85
StringEncoderTypeAlias · 0.85

Tested by

no test coverage detected