MCPcopy
hub / github.com/segmentio/kafka-go / TestMixedCompressedMessages

Function TestMixedCompressedMessages

compress/compress_test.go:218–291  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

216}
217
218func TestMixedCompressedMessages(t *testing.T) {
219 client, topic, shutdown := newLocalClientAndTopic()
220 defer shutdown()
221
222 offset := 0
223 var values []string
224 produce := func(n int, codec pkg.Codec) {
225 w := &kafka.Writer{
226 Addr: kafka.TCP("127.0.0.1:9092"),
227 Topic: topic,
228 Transport: client.Transport,
229 }
230 defer w.Close()
231
232 if codec != nil {
233 w.Compression = kafka.Compression(codec.Code())
234 }
235
236 msgs := make([]kafka.Message, n)
237 for i := range msgs {
238 value := fmt.Sprintf("Hello World %d!", offset)
239 values = append(values, value)
240 offset++
241 msgs[i] = kafka.Message{Value: []byte(value)}
242 }
243
244 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
245 defer cancel()
246 if err := w.WriteMessages(ctx, msgs...); err != nil {
247 t.Errorf("failed to produce messages: %+v", err)
248 }
249 }
250
251 // produce messages that interleave uncompressed messages and messages with
252 // different compression codecs. reader should be able to properly handle
253 // all of them.
254 produce(10, nil)
255 produce(20, new(gzip.Codec))
256 produce(5, nil)
257 produce(10, new(snappy.Codec))
258 produce(10, new(lz4.Codec))
259 produce(5, nil)
260
261 r := kafka.NewReader(kafka.ReaderConfig{
262 Brokers: []string{"127.0.0.1:9092"},
263 Topic: topic,
264 Partition: 0,
265 MaxWait: 10 * time.Millisecond,
266 MinBytes: 1,
267 MaxBytes: 1024,
268 })
269 defer r.Close()
270
271 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
272 defer cancel()
273
274 // in order to ensure proper handling of decompressing message, read at
275 // offsets that we know to be in the middle of compressed message sets.

Callers

nothing calls this directly

Calls 9

CloseMethod · 0.95
WriteMessagesMethod · 0.95
CloseMethod · 0.95
SetOffsetMethod · 0.95
ReadMessageMethod · 0.95
CompressionMethod · 0.80
newLocalClientAndTopicFunction · 0.70
CodeMethod · 0.65
NewReaderMethod · 0.65

Tested by

no test coverage detected