TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests a new producerID when producerEpoch is exhausted
(t *testing.T)
| 1634 | // TestAsyncProducerIdempotentEpochExhaustion ensures that producer requests |
| 1635 | // a new producerID when producerEpoch is exhausted |
| 1636 | func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) { |
| 1637 | broker := NewMockBroker(t, 1) |
| 1638 | defer broker.Close() |
| 1639 | |
| 1640 | var ( |
| 1641 | initialProducerID = int64(1000) |
| 1642 | newProducerID = initialProducerID + 1 |
| 1643 | ) |
| 1644 | |
| 1645 | metadataResponse := &MetadataResponse{ |
| 1646 | Version: 4, |
| 1647 | ControllerID: 1, |
| 1648 | } |
| 1649 | metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) |
| 1650 | metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 1651 | broker.Returns(metadataResponse) |
| 1652 | |
| 1653 | initProducerID := &InitProducerIDResponse{ |
| 1654 | ThrottleTime: 0, |
| 1655 | ProducerID: initialProducerID, |
| 1656 | ProducerEpoch: math.MaxInt16, // Mock ProducerEpoch at the exhaustion point |
| 1657 | } |
| 1658 | broker.Returns(initProducerID) |
| 1659 | |
| 1660 | config := NewTestConfig() |
| 1661 | config.Producer.Flush.Messages = 10 |
| 1662 | config.Producer.Flush.Frequency = 10 * time.Millisecond |
| 1663 | config.Producer.Return.Successes = true |
| 1664 | config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust |
| 1665 | config.Producer.RequiredAcks = WaitForAll |
| 1666 | config.Producer.Retry.Backoff = 0 |
| 1667 | config.Producer.Idempotent = true |
| 1668 | config.Net.MaxOpenRequests = 1 |
| 1669 | config.Version = V0_11_0_0 |
| 1670 | |
| 1671 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 1672 | if err != nil { |
| 1673 | t.Fatal(err) |
| 1674 | } |
| 1675 | defer closeProducer(t, producer) |
| 1676 | |
| 1677 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} |
| 1678 | prodError := &ProduceResponse{ |
| 1679 | Version: 3, |
| 1680 | ThrottleTime: 0, |
| 1681 | } |
| 1682 | prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable) |
| 1683 | broker.Returns(prodError) |
| 1684 | broker.Returns(&InitProducerIDResponse{ |
| 1685 | ProducerID: newProducerID, |
| 1686 | }) |
| 1687 | |
| 1688 | <-producer.Errors() |
| 1689 | |
| 1690 | lastProduceReqRes := broker.history[len(broker.history)-2] // last is InitProducerIDRequest |
| 1691 | lastProduceBatch := lastProduceReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch |
| 1692 | if lastProduceBatch.FirstSequence != 0 { |
| 1693 | t.Error("first sequence not zero") |
nothing calls this directly
no test coverage detected