Tests that the reader can read record batches from log compacted topics where the batch ends with compacted records. This test forces varying sized chunks of duplicated messages along with configuring the topic with a minimal `segment.bytes` in order to guarantee that at least 1 batch can be compac
(t *testing.T)
| 1806 | // guarantee that at least 1 batch can be compacted down to 0 "unread" messages |
| 1807 | // with at least 1 "old" message otherwise the batch is skipped entirely. |
| 1808 | func TestReaderReadCompactedMessage(t *testing.T) { |
| 1809 | topic := makeTopic() |
| 1810 | createTopicWithCompaction(t, topic, 1) |
| 1811 | defer deleteTopic(t, topic) |
| 1812 | |
| 1813 | msgs := makeTestDuplicateSequence() |
| 1814 | |
| 1815 | writeMessagesForCompactionCheck(t, topic, msgs) |
| 1816 | |
| 1817 | expectedKeys := map[string]int{} |
| 1818 | for _, msg := range msgs { |
| 1819 | expectedKeys[string(msg.Key)] = 1 |
| 1820 | } |
| 1821 | |
| 1822 | // kafka 2.0.1 is extra slow |
| 1823 | ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) |
| 1824 | defer cancel() |
| 1825 | for { |
| 1826 | success := func() bool { |
| 1827 | r := NewReader(ReaderConfig{ |
| 1828 | Brokers: []string{"localhost:9092"}, |
| 1829 | Topic: topic, |
| 1830 | MinBytes: 200, |
| 1831 | MaxBytes: 200, |
| 1832 | // Speed up testing |
| 1833 | MaxWait: 100 * time.Millisecond, |
| 1834 | }) |
| 1835 | defer r.Close() |
| 1836 | |
| 1837 | keys := map[string]int{} |
| 1838 | for { |
| 1839 | m, err := r.FetchMessage(ctx) |
| 1840 | if err != nil { |
| 1841 | t.Logf("can't get message from compacted log: %v", err) |
| 1842 | return false |
| 1843 | } |
| 1844 | keys[string(m.Key)]++ |
| 1845 | |
| 1846 | if len(keys) == countKeys(msgs) { |
| 1847 | t.Logf("got keys: %+v", keys) |
| 1848 | return reflect.DeepEqual(keys, expectedKeys) |
| 1849 | } |
| 1850 | } |
| 1851 | }() |
| 1852 | if success { |
| 1853 | return |
| 1854 | } |
| 1855 | select { |
| 1856 | case <-ctx.Done(): |
| 1857 | t.Fatal(ctx.Err()) |
| 1858 | default: |
| 1859 | } |
| 1860 | } |
| 1861 | } |
| 1862 | |
| 1863 | // writeMessagesForCompactionCheck writes messages with specific writer configuration. |
| 1864 | func writeMessagesForCompactionCheck(t *testing.T, topic string, msgs []Message) { |
nothing calls this directly
no test coverage detected