MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerIdempotentEpochRollover

Function TestAsyncProducerIdempotentEpochRollover

async_producer_test.go:1565–1632  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1563}
1564
1565func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
1566 broker := NewMockBroker(t, 1)
1567 defer broker.Close()
1568
1569 metadataResponse := &MetadataResponse{
1570 Version: 4,
1571 ControllerID: 1,
1572 }
1573 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
1574 metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
1575 broker.Returns(metadataResponse)
1576
1577 initProducerID := &InitProducerIDResponse{
1578 ThrottleTime: 0,
1579 ProducerID: 1000,
1580 ProducerEpoch: 1,
1581 }
1582 broker.Returns(initProducerID)
1583
1584 config := NewTestConfig()
1585 config.Producer.Flush.Messages = 10
1586 config.Producer.Flush.Frequency = 10 * time.Millisecond
1587 config.Producer.Return.Successes = true
1588 config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
1589 config.Producer.RequiredAcks = WaitForAll
1590 config.Producer.Retry.Backoff = 0
1591 config.Producer.Idempotent = true
1592 config.Net.MaxOpenRequests = 1
1593 config.Version = V0_11_0_0
1594
1595 producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
1596 if err != nil {
1597 t.Fatal(err)
1598 }
1599 defer closeProducer(t, producer)
1600
1601 producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
1602 prodError := &ProduceResponse{
1603 Version: 3,
1604 ThrottleTime: 0,
1605 }
1606 prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
1607 broker.Returns(prodError)
1608 <-producer.Errors()
1609
1610 lastReqRes := broker.history[len(broker.history)-1]
1611 lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
1612 if lastProduceBatch.FirstSequence != 0 {
1613 t.Error("first sequence not zero")
1614 }
1615 if lastProduceBatch.ProducerEpoch != 1 {
1616 t.Error("first epoch was not one")
1617 }
1618
1619 // Now if we produce again, the epoch should have rolled over.
1620 producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
1621 broker.Returns(prodError)
1622 <-producer.Errors()

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
AddBrokerMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
AddTopicPartitionMethod · 0.95
ReturnsMethod · 0.95
InputMethod · 0.95
AddTopicPartitionMethod · 0.95
ErrorsMethod · 0.95
NewMockBrokerFunction · 0.85
closeProducerFunction · 0.85
StringEncoderTypeAlias · 0.85

Tested by

no test coverage detected