(t *testing.T)
| 1563 | } |
| 1564 | |
| 1565 | func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { |
| 1566 | broker := NewMockBroker(t, 1) |
| 1567 | defer broker.Close() |
| 1568 | |
| 1569 | metadataResponse := &MetadataResponse{ |
| 1570 | Version: 4, |
| 1571 | ControllerID: 1, |
| 1572 | } |
| 1573 | metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) |
| 1574 | metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 1575 | broker.Returns(metadataResponse) |
| 1576 | |
| 1577 | initProducerID := &InitProducerIDResponse{ |
| 1578 | ThrottleTime: 0, |
| 1579 | ProducerID: 1000, |
| 1580 | ProducerEpoch: 1, |
| 1581 | } |
| 1582 | broker.Returns(initProducerID) |
| 1583 | |
| 1584 | config := NewTestConfig() |
| 1585 | config.Producer.Flush.Messages = 10 |
| 1586 | config.Producer.Flush.Frequency = 10 * time.Millisecond |
| 1587 | config.Producer.Return.Successes = true |
| 1588 | config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust |
| 1589 | config.Producer.RequiredAcks = WaitForAll |
| 1590 | config.Producer.Retry.Backoff = 0 |
| 1591 | config.Producer.Idempotent = true |
| 1592 | config.Net.MaxOpenRequests = 1 |
| 1593 | config.Version = V0_11_0_0 |
| 1594 | |
| 1595 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 1596 | if err != nil { |
| 1597 | t.Fatal(err) |
| 1598 | } |
| 1599 | defer closeProducer(t, producer) |
| 1600 | |
| 1601 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} |
| 1602 | prodError := &ProduceResponse{ |
| 1603 | Version: 3, |
| 1604 | ThrottleTime: 0, |
| 1605 | } |
| 1606 | prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable) |
| 1607 | broker.Returns(prodError) |
| 1608 | <-producer.Errors() |
| 1609 | |
| 1610 | lastReqRes := broker.history[len(broker.history)-1] |
| 1611 | lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch |
| 1612 | if lastProduceBatch.FirstSequence != 0 { |
| 1613 | t.Error("first sequence not zero") |
| 1614 | } |
| 1615 | if lastProduceBatch.ProducerEpoch != 1 { |
| 1616 | t.Error("first epoch was not one") |
| 1617 | } |
| 1618 | |
| 1619 | // Now if we produce again, the epoch should have rolled over. |
| 1620 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")} |
| 1621 | broker.Returns(prodError) |
| 1622 | <-producer.Errors() |
nothing calls this directly
no test coverage detected