(t *testing.T, codec pkg.Codec)
| 146 | } |
| 147 | |
| 148 | func testCompressedMessages(t *testing.T, codec pkg.Codec) { |
| 149 | t.Run(codec.Name(), func(t *testing.T) { |
| 150 | client, topic, shutdown := newLocalClientAndTopic() |
| 151 | defer shutdown() |
| 152 | |
| 153 | w := &kafka.Writer{ |
| 154 | Addr: kafka.TCP("127.0.0.1:9092"), |
| 155 | Topic: topic, |
| 156 | Compression: kafka.Compression(codec.Code()), |
| 157 | BatchTimeout: 10 * time.Millisecond, |
| 158 | Transport: client.Transport, |
| 159 | } |
| 160 | defer w.Close() |
| 161 | |
| 162 | offset := 0 |
| 163 | var values []string |
| 164 | for i := 0; i < 10; i++ { |
| 165 | batch := make([]kafka.Message, i+1) |
| 166 | for j := range batch { |
| 167 | value := fmt.Sprintf("Hello World %d!", offset) |
| 168 | values = append(values, value) |
| 169 | batch[j] = kafka.Message{ |
| 170 | Key: []byte(strconv.Itoa(offset)), |
| 171 | Value: []byte(value), |
| 172 | } |
| 173 | offset++ |
| 174 | } |
| 175 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 176 | if err := w.WriteMessages(ctx, batch...); err != nil { |
| 177 | t.Errorf("error sending batch %d, reason: %+v", i+1, err) |
| 178 | } |
| 179 | cancel() |
| 180 | } |
| 181 | |
| 182 | r := kafka.NewReader(kafka.ReaderConfig{ |
| 183 | Brokers: []string{"127.0.0.1:9092"}, |
| 184 | Topic: topic, |
| 185 | Partition: 0, |
| 186 | MaxWait: 10 * time.Millisecond, |
| 187 | MinBytes: 1, |
| 188 | MaxBytes: 1024, |
| 189 | }) |
| 190 | defer r.Close() |
| 191 | |
| 192 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 193 | defer cancel() |
| 194 | |
| 195 | // in order to ensure proper handling of decompressing message, read at |
| 196 | // offsets that we know to be in the middle of compressed message sets. |
| 197 | for base := range values { |
| 198 | r.SetOffset(int64(base)) |
| 199 | for i := base; i < len(values); i++ { |
| 200 | msg, err := r.ReadMessage(ctx) |
| 201 | if err != nil { |
| 202 | t.Fatalf("error receiving message at loop %d, offset %d, reason: %+v", base, i, err) |
| 203 | } |
| 204 | if msg.Offset != int64(i) { |
| 205 | t.Fatalf("wrong offset at loop %d...expected %d but got %d", base, i, msg.Offset) |
no test coverage detected