(t *testing.T)
| 1880 | } |
| 1881 | |
| 1882 | func TestConsumerTimestamps(t *testing.T) { |
| 1883 | now := time.Now().Truncate(time.Millisecond) |
| 1884 | type testMessage struct { |
| 1885 | key Encoder |
| 1886 | offset int64 |
| 1887 | timestamp time.Time |
| 1888 | } |
| 1889 | for _, d := range []struct { |
| 1890 | kversion KafkaVersion |
| 1891 | logAppendTime bool |
| 1892 | messages []testMessage |
| 1893 | expectedTimestamp []time.Time |
| 1894 | }{ |
| 1895 | {MinVersion, false, []testMessage{ |
| 1896 | {testMsg, 1, now}, |
| 1897 | {testMsg, 2, now}, |
| 1898 | }, []time.Time{{}, {}}}, |
| 1899 | {V0_9_0_0, false, []testMessage{ |
| 1900 | {testMsg, 1, now}, |
| 1901 | {testMsg, 2, now}, |
| 1902 | }, []time.Time{{}, {}}}, |
| 1903 | {V0_10_0_0, false, []testMessage{ |
| 1904 | {testMsg, 1, now}, |
| 1905 | {testMsg, 2, now}, |
| 1906 | }, []time.Time{{}, {}}}, |
| 1907 | {V0_10_2_1, false, []testMessage{ |
| 1908 | {testMsg, 1, now.Add(time.Second)}, |
| 1909 | {testMsg, 2, now.Add(2 * time.Second)}, |
| 1910 | }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, |
| 1911 | {V0_10_2_1, true, []testMessage{ |
| 1912 | {testMsg, 1, now.Add(time.Second)}, |
| 1913 | {testMsg, 2, now.Add(2 * time.Second)}, |
| 1914 | }, []time.Time{now, now}}, |
| 1915 | {V0_11_0_0, false, []testMessage{ |
| 1916 | {testMsg, 1, now.Add(time.Second)}, |
| 1917 | {testMsg, 2, now.Add(2 * time.Second)}, |
| 1918 | }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, |
| 1919 | {V0_11_0_0, true, []testMessage{ |
| 1920 | {testMsg, 1, now.Add(time.Second)}, |
| 1921 | {testMsg, 2, now.Add(2 * time.Second)}, |
| 1922 | }, []time.Time{now, now}}, |
| 1923 | } { |
| 1924 | var fr *FetchResponse |
| 1925 | cfg := NewTestConfig() |
| 1926 | cfg.Version = d.kversion |
| 1927 | switch { |
| 1928 | case d.kversion.IsAtLeast(V0_11_0_0): |
| 1929 | fr = &FetchResponse{Version: 5, LogAppendTime: d.logAppendTime, Timestamp: now} |
| 1930 | for _, m := range d.messages { |
| 1931 | fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp) |
| 1932 | } |
| 1933 | fr.SetLastOffsetDelta("my_topic", 0, 2) |
| 1934 | fr.SetLastStableOffset("my_topic", 0, 2) |
| 1935 | case d.kversion.IsAtLeast(V0_10_1_0): |
| 1936 | fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now} |
| 1937 | for _, m := range d.messages { |
| 1938 | fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1) |
| 1939 | } |
nothing calls this directly
no test coverage detected