(t *testing.T)
| 192 | } |
| 193 | |
| 194 | func TestWriteV2RecordBatch(t *testing.T) { |
| 195 | if !ktesting.KafkaIsAtLeast("0.11.0") { |
| 196 | t.Skip("RecordBatch was added in kafka 0.11.0") |
| 197 | return |
| 198 | } |
| 199 | |
| 200 | client, topic, shutdown := newLocalClientAndTopic() |
| 201 | defer shutdown() |
| 202 | |
| 203 | msgs := make([]Message, 15) |
| 204 | for i := range msgs { |
| 205 | value := fmt.Sprintf("Sample message content: %d!", i) |
| 206 | msgs[i] = Message{Key: []byte("Key"), Value: []byte(value), Headers: []Header{{Key: "hk", Value: []byte("hv")}}} |
| 207 | } |
| 208 | |
| 209 | w := &Writer{ |
| 210 | Addr: TCP("localhost:9092"), |
| 211 | Topic: topic, |
| 212 | BatchTimeout: 100 * time.Millisecond, |
| 213 | BatchSize: 5, |
| 214 | Transport: client.Transport, |
| 215 | } |
| 216 | |
| 217 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 218 | defer cancel() |
| 219 | |
| 220 | if err := w.WriteMessages(ctx, msgs...); err != nil { |
| 221 | t.Errorf("Failed to write v2 messages to kafka: %v", err) |
| 222 | return |
| 223 | } |
| 224 | w.Close() |
| 225 | |
| 226 | r := NewReader(ReaderConfig{ |
| 227 | Brokers: []string{"localhost:9092"}, |
| 228 | Topic: topic, |
| 229 | MaxWait: 100 * time.Millisecond, |
| 230 | }) |
| 231 | defer r.Close() |
| 232 | |
| 233 | msg, err := r.ReadMessage(context.Background()) |
| 234 | if err != nil { |
| 235 | t.Error("Failed to read message") |
| 236 | return |
| 237 | } |
| 238 | |
| 239 | if string(msg.Key) != "Key" { |
| 240 | t.Error("Received message's key doesn't match") |
| 241 | return |
| 242 | } |
| 243 | if msg.Headers[0].Key != "hk" { |
| 244 | t.Error("Received message header's key doesn't match") |
| 245 | return |
| 246 | } |
| 247 | } |
nothing calls this directly
no test coverage detected