(t *testing.T)
| 1829 | } |
| 1830 | |
| 1831 | func TestPartitionConsumerComputeBackoff(t *testing.T) { |
| 1832 | newChild := func() *partitionConsumer { |
| 1833 | return &partitionConsumer{ |
| 1834 | conf: NewTestConfig(), |
| 1835 | topic: "my_topic", |
| 1836 | partition: 7, |
| 1837 | } |
| 1838 | } |
| 1839 | |
| 1840 | t.Run("returns configured backoff", func(t *testing.T) { |
| 1841 | child := newChild() |
| 1842 | child.conf.Consumer.Retry.Backoff = 7 * time.Millisecond |
| 1843 | require.Equal(t, 7*time.Millisecond, child.computeBackoff()) |
| 1844 | }) |
| 1845 | |
| 1846 | t.Run("BackoffFunc receives consecutive retry counts", func(t *testing.T) { |
| 1847 | child := newChild() |
| 1848 | var seen []int |
| 1849 | child.conf.Consumer.Retry.BackoffFunc = func(retries int) time.Duration { |
| 1850 | seen = append(seen, retries) |
| 1851 | return 0 |
| 1852 | } |
| 1853 | for range 3 { |
| 1854 | child.computeBackoff() |
| 1855 | } |
| 1856 | require.Equal(t, []int{1, 2, 3}, seen) |
| 1857 | }) |
| 1858 | |
| 1859 | t.Run("emits stuck warning at threshold and each multiple", func(t *testing.T) { |
| 1860 | child := newChild() |
| 1861 | var buf bytes.Buffer |
| 1862 | orig := Logger |
| 1863 | Logger = log.New(&buf, "", 0) |
| 1864 | t.Cleanup(func() { Logger = orig }) |
| 1865 | |
| 1866 | expectStuckWarning := func(want string) { |
| 1867 | t.Helper() |
| 1868 | buf.Reset() |
| 1869 | for i := 1; i < stuckRetryThreshold; i++ { |
| 1870 | child.computeBackoff() |
| 1871 | } |
| 1872 | require.NotContains(t, buf.String(), "still retrying") |
| 1873 | child.computeBackoff() |
| 1874 | require.Contains(t, buf.String(), want) |
| 1875 | } |
| 1876 | |
| 1877 | expectStuckWarning("consumer/my_topic/7 still retrying after 10") |
| 1878 | expectStuckWarning("consumer/my_topic/7 still retrying after 20") |
| 1879 | }) |
| 1880 | } |
| 1881 | |
| 1882 | func TestConsumerTimestamps(t *testing.T) { |
| 1883 | now := time.Now().Truncate(time.Millisecond) |
nothing calls this directly
no test coverage detected