Tests that the reader can handle messages where the response is truncated due to reaching MaxBytes. If MaxBytes is too small to fit 1 record then it will never truncate, so we start from a small message size and increase it until we are sure truncation has happened at some point.
(t *testing.T)
| 1728 | // we start from a small message size and increase it until we are sure |
| 1729 | // truncation has happened at some point. |
| 1730 | func TestReaderTruncatedResponse(t *testing.T) { |
| 1731 | topic := makeTopic() |
| 1732 | createTopic(t, topic, 1) |
| 1733 | defer deleteTopic(t, topic) |
| 1734 | |
| 1735 | readerMaxBytes := 100 |
| 1736 | batchSize := 4 |
| 1737 | maxMsgPadding := 5 |
| 1738 | readContextTimeout := 10 * time.Second |
| 1739 | |
| 1740 | var msgs []Message |
| 1741 | // The key of each message |
| 1742 | n := 0 |
| 1743 | // `i` is the amount of padding per message |
| 1744 | for i := 0; i < maxMsgPadding; i++ { |
| 1745 | bb := bytes.Buffer{} |
| 1746 | for x := 0; x < i; x++ { |
| 1747 | _, err := bb.WriteRune('0') |
| 1748 | require.NoError(t, err) |
| 1749 | } |
| 1750 | padding := bb.Bytes() |
| 1751 | // `j` is the number of times the message repeats |
| 1752 | for j := 0; j < batchSize*4; j++ { |
| 1753 | msgs = append(msgs, Message{ |
| 1754 | Key: []byte(fmt.Sprintf("%05d", n)), |
| 1755 | Value: padding, |
| 1756 | }) |
| 1757 | n++ |
| 1758 | } |
| 1759 | } |
| 1760 | |
| 1761 | wr := NewWriter(WriterConfig{ |
| 1762 | Brokers: []string{"localhost:9092"}, |
| 1763 | BatchSize: batchSize, |
| 1764 | Async: false, |
| 1765 | Topic: topic, |
| 1766 | Balancer: &LeastBytes{}, |
| 1767 | }) |
| 1768 | err := wr.WriteMessages(context.Background(), msgs...) |
| 1769 | require.NoError(t, err) |
| 1770 | |
| 1771 | ctx, cancel := context.WithTimeout(context.Background(), readContextTimeout) |
| 1772 | defer cancel() |
| 1773 | r := NewReader(ReaderConfig{ |
| 1774 | Brokers: []string{"localhost:9092"}, |
| 1775 | Topic: topic, |
| 1776 | MinBytes: 1, |
| 1777 | MaxBytes: readerMaxBytes, |
| 1778 | // Speed up testing |
| 1779 | MaxWait: 100 * time.Millisecond, |
| 1780 | }) |
| 1781 | defer r.Close() |
| 1782 | |
| 1783 | expectedKeys := map[string]struct{}{} |
| 1784 | for _, k := range msgs { |
| 1785 | expectedKeys[string(k.Key)] = struct{}{} |
| 1786 | } |
| 1787 | keys := map[string]struct{}{} |
nothing calls this directly
no test coverage detected