MCPcopy
hub / github.com/segmentio/kafka-go / TestReaderTruncatedResponse

Function TestReaderTruncatedResponse

reader_test.go:1730–1799  ·  view source on GitHub ↗

Tests that the reader can handle messages where the response is truncated due to reaching MaxBytes. If MaxBytes is too small to fit 1 record then it will never truncate, so we start from a small message size and increase it until we are sure truncation has happened at some point.

(t *testing.T)

Source from the content-addressed store, hash-verified

1728// we start from a small message size and increase it until we are sure
1729// truncation has happened at some point.
1730func TestReaderTruncatedResponse(t *testing.T) {
1731 topic := makeTopic()
1732 createTopic(t, topic, 1)
1733 defer deleteTopic(t, topic)
1734
1735 readerMaxBytes := 100
1736 batchSize := 4
1737 maxMsgPadding := 5
1738 readContextTimeout := 10 * time.Second
1739
1740 var msgs []Message
1741 // The key of each message
1742 n := 0
1743 // `i` is the amount of padding per message
1744 for i := 0; i < maxMsgPadding; i++ {
1745 bb := bytes.Buffer{}
1746 for x := 0; x < i; x++ {
1747 _, err := bb.WriteRune('0')
1748 require.NoError(t, err)
1749 }
1750 padding := bb.Bytes()
1751 // `j` is the number of times the message repeats
1752 for j := 0; j < batchSize*4; j++ {
1753 msgs = append(msgs, Message{
1754 Key: []byte(fmt.Sprintf("%05d", n)),
1755 Value: padding,
1756 })
1757 n++
1758 }
1759 }
1760
1761 wr := NewWriter(WriterConfig{
1762 Brokers: []string{"localhost:9092"},
1763 BatchSize: batchSize,
1764 Async: false,
1765 Topic: topic,
1766 Balancer: &LeastBytes{},
1767 })
1768 err := wr.WriteMessages(context.Background(), msgs...)
1769 require.NoError(t, err)
1770
1771 ctx, cancel := context.WithTimeout(context.Background(), readContextTimeout)
1772 defer cancel()
1773 r := NewReader(ReaderConfig{
1774 Brokers: []string{"localhost:9092"},
1775 Topic: topic,
1776 MinBytes: 1,
1777 MaxBytes: readerMaxBytes,
1778 // Speed up testing
1779 MaxWait: 100 * time.Millisecond,
1780 })
1781 defer r.Close()
1782
1783 expectedKeys := map[string]struct{}{}
1784 for _, k := range msgs {
1785 expectedKeys[string(k.Key)] = struct{}{}
1786 }
1787 keys := map[string]struct{}{}

Callers

nothing calls this directly

Calls 9

WriteMessagesMethod · 0.95
CloseMethod · 0.95
FetchMessageMethod · 0.95
createTopicFunction · 0.85
deleteTopicFunction · 0.85
NewWriterFunction · 0.85
NewReaderFunction · 0.85
makeTopicFunction · 0.70
BytesMethod · 0.65

Tested by

no test coverage detected