MCPcopy
hub / github.com/IBM/sarama / TestConsumerTimestamps

Function TestConsumerTimestamps

consumer_test.go:1882–1992  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1880}
1881
1882func TestConsumerTimestamps(t *testing.T) {
1883 now := time.Now().Truncate(time.Millisecond)
1884 type testMessage struct {
1885 key Encoder
1886 offset int64
1887 timestamp time.Time
1888 }
1889 for _, d := range []struct {
1890 kversion KafkaVersion
1891 logAppendTime bool
1892 messages []testMessage
1893 expectedTimestamp []time.Time
1894 }{
1895 {MinVersion, false, []testMessage{
1896 {testMsg, 1, now},
1897 {testMsg, 2, now},
1898 }, []time.Time{{}, {}}},
1899 {V0_9_0_0, false, []testMessage{
1900 {testMsg, 1, now},
1901 {testMsg, 2, now},
1902 }, []time.Time{{}, {}}},
1903 {V0_10_0_0, false, []testMessage{
1904 {testMsg, 1, now},
1905 {testMsg, 2, now},
1906 }, []time.Time{{}, {}}},
1907 {V0_10_2_1, false, []testMessage{
1908 {testMsg, 1, now.Add(time.Second)},
1909 {testMsg, 2, now.Add(2 * time.Second)},
1910 }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1911 {V0_10_2_1, true, []testMessage{
1912 {testMsg, 1, now.Add(time.Second)},
1913 {testMsg, 2, now.Add(2 * time.Second)},
1914 }, []time.Time{now, now}},
1915 {V0_11_0_0, false, []testMessage{
1916 {testMsg, 1, now.Add(time.Second)},
1917 {testMsg, 2, now.Add(2 * time.Second)},
1918 }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
1919 {V0_11_0_0, true, []testMessage{
1920 {testMsg, 1, now.Add(time.Second)},
1921 {testMsg, 2, now.Add(2 * time.Second)},
1922 }, []time.Time{now, now}},
1923 } {
1924 var fr *FetchResponse
1925 cfg := NewTestConfig()
1926 cfg.Version = d.kversion
1927 switch {
1928 case d.kversion.IsAtLeast(V0_11_0_0):
1929 fr = &FetchResponse{Version: 5, LogAppendTime: d.logAppendTime, Timestamp: now}
1930 for _, m := range d.messages {
1931 fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
1932 }
1933 fr.SetLastOffsetDelta("my_topic", 0, 2)
1934 fr.SetLastStableOffset("my_topic", 0, 2)
1935 case d.kversion.IsAtLeast(V0_10_1_0):
1936 fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
1937 for _, m := range d.messages {
1938 fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
1939 }

Callers

nothing calls this directly

Calls 15

SetLastOffsetDeltaMethod · 0.95
SetLastStableOffsetMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85

Tested by

no test coverage detected