MCPcopy
hub / github.com/IBM/sarama / TestPartitionConsumerRetryMax

Function TestPartitionConsumerRetryMax

consumer_test.go:1791–1829  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1789}
1790
1791func TestPartitionConsumerRetryMax(t *testing.T) {
1792 config := NewTestConfig()
1793 config.Consumer.Return.Errors = true
1794 config.Consumer.Retry.Max = 3
1795
1796 child := &partitionConsumer{
1797 consumer: &consumer{},
1798 conf: config,
1799 topic: "my_topic",
1800 partition: 7,
1801 errors: make(chan *ConsumerError, 1),
1802 feeder: make(chan *partitionConsumerResponse, 1),
1803 trigger: make(chan none, 1),
1804 dying: make(chan none),
1805 dispatcherStop: make(chan none),
1806 }
1807 child.retries.Store(int32(config.Consumer.Retry.Max))
1808
1809 done := make(chan none)
1810 go func() {
1811 defer close(done)
1812 child.dispatcher()
1813 }()
1814
1815 child.trigger <- none{}
1816
1817 select {
1818 case err := <-child.errors:
1819 require.ErrorIs(t, err.Err, ErrConsumerRetriesExhausted)
1820 case <-time.After(2 * time.Second):
1821 require.FailNow(t, "expected ErrConsumerRetriesExhausted on errors channel")
1822 }
1823
1824 select {
1825 case <-done:
1826 case <-time.After(2 * time.Second):
1827 require.FailNow(t, "dispatcher did not exit after Retry.Max")
1828 }
1829}
1830
1831func TestPartitionConsumerComputeBackoff(t *testing.T) {
1832 newChild := func() *partitionConsumer {

Callers

nothing calls this directly

Calls 2

dispatcherMethod · 0.95
NewTestConfigFunction · 0.70

Tested by

no test coverage detected