MCPcopy
hub / github.com/segmentio/kafka-go / testCompressedMessages

Function testCompressedMessages

compress/compress_test.go:148–216  ·  compress/compress_test.go::testCompressedMessages
(t *testing.T, codec pkg.Codec)

Source from the content-addressed store, hash-verified

146}
147
148func testCompressedMessages(t *testing.T, codec pkg.Codec) {
149 t.Run(codec.Name(), func(t *testing.T) {
150 client, topic, shutdown := newLocalClientAndTopic()
151 defer shutdown()
152
153 w := &kafka.Writer{
154 Addr: kafka.TCP("127.0.0.1:9092"),
155 Topic: topic,
156 Compression: kafka.Compression(codec.Code()),
157 BatchTimeout: 10 * time.Millisecond,
158 Transport: client.Transport,
159 }
160 defer w.Close()
161
162 offset := 0
163 var values []string
164 for i := 0; i < 10; i++ {
165 batch := make([]kafka.Message, i+1)
166 for j := range batch {
167 value := fmt.Sprintf("Hello World %d!", offset)
168 values = append(values, value)
169 batch[j] = kafka.Message{
170 Key: []byte(strconv.Itoa(offset)),
171 Value: []byte(value),
172 }
173 offset++
174 }
175 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
176 if err := w.WriteMessages(ctx, batch...); err != nil {
177 t.Errorf("error sending batch %d, reason: %+v", i+1, err)
178 }
179 cancel()
180 }
181
182 r := kafka.NewReader(kafka.ReaderConfig{
183 Brokers: []string{"127.0.0.1:9092"},
184 Topic: topic,
185 Partition: 0,
186 MaxWait: 10 * time.Millisecond,
187 MinBytes: 1,
188 MaxBytes: 1024,
189 })
190 defer r.Close()
191
192 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
193 defer cancel()
194
195 // in order to ensure proper handling of decompressing message, read at
196 // offsets that we know to be in the middle of compressed message sets.
197 for base := range values {
198 r.SetOffset(int64(base))
199 for i := base; i < len(values); i++ {
200 msg, err := r.ReadMessage(ctx)
201 if err != nil {
202 t.Fatalf("error receiving message at loop %d, offset %d, reason: %+v", base, i, err)
203 }
204 if msg.Offset != int64(i) {
205 t.Fatalf("wrong offset at loop %d...expected %d but got %d", base, i, msg.Offset)

Callers 1

TestCompressedMessagesFunction · 0.85

Calls 10

CloseMethod · 0.95
WriteMessagesMethod · 0.95
CloseMethod · 0.95
SetOffsetMethod · 0.95
ReadMessageMethod · 0.95
CompressionMethod · 0.80
newLocalClientAndTopicFunction · 0.70
NameMethod · 0.65
CodeMethod · 0.65
NewReaderMethod · 0.65

Tested by

no test coverage detected