MCPcopy
hub / github.com/segmentio/kafka-go / TestReaderReadCompactedMessage

Function TestReaderReadCompactedMessage

reader_test.go:1808–1861  ·  view source on GitHub ↗

Tests that the reader can read record batches from log compacted topics where the batch ends with compacted records. This test forces varying sized chunks of duplicated messages along with configuring the topic with a minimal `segment.bytes` in order to guarantee that at least 1 batch can be compac

(t *testing.T)

Source from the content-addressed store, hash-verified

1806// guarantee that at least 1 batch can be compacted down to 0 "unread" messages
1807// with at least 1 "old" message otherwise the batch is skipped entirely.
1808func TestReaderReadCompactedMessage(t *testing.T) {
1809 topic := makeTopic()
1810 createTopicWithCompaction(t, topic, 1)
1811 defer deleteTopic(t, topic)
1812
1813 msgs := makeTestDuplicateSequence()
1814
1815 writeMessagesForCompactionCheck(t, topic, msgs)
1816
1817 expectedKeys := map[string]int{}
1818 for _, msg := range msgs {
1819 expectedKeys[string(msg.Key)] = 1
1820 }
1821
1822 // kafka 2.0.1 is extra slow
1823 ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
1824 defer cancel()
1825 for {
1826 success := func() bool {
1827 r := NewReader(ReaderConfig{
1828 Brokers: []string{"localhost:9092"},
1829 Topic: topic,
1830 MinBytes: 200,
1831 MaxBytes: 200,
1832 // Speed up testing
1833 MaxWait: 100 * time.Millisecond,
1834 })
1835 defer r.Close()
1836
1837 keys := map[string]int{}
1838 for {
1839 m, err := r.FetchMessage(ctx)
1840 if err != nil {
1841 t.Logf("can't get message from compacted log: %v", err)
1842 return false
1843 }
1844 keys[string(m.Key)]++
1845
1846 if len(keys) == countKeys(msgs) {
1847 t.Logf("got keys: %+v", keys)
1848 return reflect.DeepEqual(keys, expectedKeys)
1849 }
1850 }
1851 }()
1852 if success {
1853 return
1854 }
1855 select {
1856 case <-ctx.Done():
1857 t.Fatal(ctx.Err())
1858 default:
1859 }
1860 }
1861}
1862
1863// writeMessagesForCompactionCheck writes messages with specific writer configuration.
1864func writeMessagesForCompactionCheck(t *testing.T, topic string, msgs []Message) {

Callers

nothing calls this directly

Calls 11

CloseMethod · 0.95
FetchMessageMethod · 0.95
deleteTopicFunction · 0.85
NewReaderFunction · 0.85
countKeysFunction · 0.85
DoneMethod · 0.80
makeTopicFunction · 0.70
ErrMethod · 0.45

Tested by

no test coverage detected