MCPcopy
hub / github.com/segmentio/kafka-go / TestWriteV2RecordBatch

Function TestWriteV2RecordBatch

write_test.go:194–247  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

192}
193
194func TestWriteV2RecordBatch(t *testing.T) {
195 if !ktesting.KafkaIsAtLeast("0.11.0") {
196 t.Skip("RecordBatch was added in kafka 0.11.0")
197 return
198 }
199
200 client, topic, shutdown := newLocalClientAndTopic()
201 defer shutdown()
202
203 msgs := make([]Message, 15)
204 for i := range msgs {
205 value := fmt.Sprintf("Sample message content: %d!", i)
206 msgs[i] = Message{Key: []byte("Key"), Value: []byte(value), Headers: []Header{{Key: "hk", Value: []byte("hv")}}}
207 }
208
209 w := &Writer{
210 Addr: TCP("localhost:9092"),
211 Topic: topic,
212 BatchTimeout: 100 * time.Millisecond,
213 BatchSize: 5,
214 Transport: client.Transport,
215 }
216
217 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
218 defer cancel()
219
220 if err := w.WriteMessages(ctx, msgs...); err != nil {
221 t.Errorf("Failed to write v2 messages to kafka: %v", err)
222 return
223 }
224 w.Close()
225
226 r := NewReader(ReaderConfig{
227 Brokers: []string{"localhost:9092"},
228 Topic: topic,
229 MaxWait: 100 * time.Millisecond,
230 })
231 defer r.Close()
232
233 msg, err := r.ReadMessage(context.Background())
234 if err != nil {
235 t.Error("Failed to read message")
236 return
237 }
238
239 if string(msg.Key) != "Key" {
240 t.Error("Received message's key doesn't match")
241 return
242 }
243 if msg.Headers[0].Key != "hk" {
244 t.Error("Received message header's key doesn't match")
245 return
246 }
247}

Callers

nothing calls this directly

Calls 8

WriteMessagesMethod · 0.95
CloseMethod · 0.95
CloseMethod · 0.95
ReadMessageMethod · 0.95
TCPFunction · 0.85
NewReaderFunction · 0.85
newLocalClientAndTopicFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected