(t *testing.T)
| 1789 | } |
| 1790 | |
| 1791 | func TestPartitionConsumerRetryMax(t *testing.T) { |
| 1792 | config := NewTestConfig() |
| 1793 | config.Consumer.Return.Errors = true |
| 1794 | config.Consumer.Retry.Max = 3 |
| 1795 | |
| 1796 | child := &partitionConsumer{ |
| 1797 | consumer: &consumer{}, |
| 1798 | conf: config, |
| 1799 | topic: "my_topic", |
| 1800 | partition: 7, |
| 1801 | errors: make(chan *ConsumerError, 1), |
| 1802 | feeder: make(chan *partitionConsumerResponse, 1), |
| 1803 | trigger: make(chan none, 1), |
| 1804 | dying: make(chan none), |
| 1805 | dispatcherStop: make(chan none), |
| 1806 | } |
| 1807 | child.retries.Store(int32(config.Consumer.Retry.Max)) |
| 1808 | |
| 1809 | done := make(chan none) |
| 1810 | go func() { |
| 1811 | defer close(done) |
| 1812 | child.dispatcher() |
| 1813 | }() |
| 1814 | |
| 1815 | child.trigger <- none{} |
| 1816 | |
| 1817 | select { |
| 1818 | case err := <-child.errors: |
| 1819 | require.ErrorIs(t, err.Err, ErrConsumerRetriesExhausted) |
| 1820 | case <-time.After(2 * time.Second): |
| 1821 | require.FailNow(t, "expected ErrConsumerRetriesExhausted on errors channel") |
| 1822 | } |
| 1823 | |
| 1824 | select { |
| 1825 | case <-done: |
| 1826 | case <-time.After(2 * time.Second): |
| 1827 | require.FailNow(t, "dispatcher did not exit after Retry.Max") |
| 1828 | } |
| 1829 | } |
| 1830 | |
| 1831 | func TestPartitionConsumerComputeBackoff(t *testing.T) { |
| 1832 | newChild := func() *partitionConsumer { |
nothing calls this directly
no test coverage detected