(t *testing.T)
| 216 | } |
| 217 | |
| 218 | func TestMixedCompressedMessages(t *testing.T) { |
| 219 | client, topic, shutdown := newLocalClientAndTopic() |
| 220 | defer shutdown() |
| 221 | |
| 222 | offset := 0 |
| 223 | var values []string |
| 224 | produce := func(n int, codec pkg.Codec) { |
| 225 | w := &kafka.Writer{ |
| 226 | Addr: kafka.TCP("127.0.0.1:9092"), |
| 227 | Topic: topic, |
| 228 | Transport: client.Transport, |
| 229 | } |
| 230 | defer w.Close() |
| 231 | |
| 232 | if codec != nil { |
| 233 | w.Compression = kafka.Compression(codec.Code()) |
| 234 | } |
| 235 | |
| 236 | msgs := make([]kafka.Message, n) |
| 237 | for i := range msgs { |
| 238 | value := fmt.Sprintf("Hello World %d!", offset) |
| 239 | values = append(values, value) |
| 240 | offset++ |
| 241 | msgs[i] = kafka.Message{Value: []byte(value)} |
| 242 | } |
| 243 | |
| 244 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 245 | defer cancel() |
| 246 | if err := w.WriteMessages(ctx, msgs...); err != nil { |
| 247 | t.Errorf("failed to produce messages: %+v", err) |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | // produce messages that interleave uncompressed messages and messages with |
| 252 | // different compression codecs. reader should be able to properly handle |
| 253 | // all of them. |
| 254 | produce(10, nil) |
| 255 | produce(20, new(gzip.Codec)) |
| 256 | produce(5, nil) |
| 257 | produce(10, new(snappy.Codec)) |
| 258 | produce(10, new(lz4.Codec)) |
| 259 | produce(5, nil) |
| 260 | |
| 261 | r := kafka.NewReader(kafka.ReaderConfig{ |
| 262 | Brokers: []string{"127.0.0.1:9092"}, |
| 263 | Topic: topic, |
| 264 | Partition: 0, |
| 265 | MaxWait: 10 * time.Millisecond, |
| 266 | MinBytes: 1, |
| 267 | MaxBytes: 1024, |
| 268 | }) |
| 269 | defer r.Close() |
| 270 | |
| 271 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 272 | defer cancel() |
| 273 | |
| 274 | // in order to ensure proper handling of decompressing message, read at |
| 275 | // offsets that we know to be in the middle of compressed message sets. |
nothing calls this directly
no test coverage detected