MCPcopy
hub / github.com/segmentio/kafka-go / testWriterBatchBytes

Function testWriterBatchBytes

writer_test.go:453–504  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

451}
452
453func testWriterBatchBytes(t *testing.T) {
454 topic := makeTopic()
455 createTopic(t, topic, 1)
456 defer deleteTopic(t, topic)
457
458 offset, err := readOffset(topic, 0)
459 if err != nil {
460 t.Fatal(err)
461 }
462
463 w := newTestWriter(WriterConfig{
464 Topic: topic,
465 BatchBytes: 50,
466 BatchTimeout: math.MaxInt32 * time.Second,
467 Balancer: &RoundRobin{},
468 })
469 defer w.Close()
470
471 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
472 defer cancel()
473 if err := w.WriteMessages(ctx, []Message{
474 {Value: []byte("M0")}, // 25 Bytes
475 {Value: []byte("M1")}, // 25 Bytes
476 {Value: []byte("M2")}, // 25 Bytes
477 {Value: []byte("M3")}, // 25 Bytes
478 }...); err != nil {
479 t.Error(err)
480 return
481 }
482
483 if w.Stats().Writes != 2 {
484 t.Error("didn't create expected batches")
485 return
486 }
487 msgs, err := readPartition(topic, 0, offset)
488 if err != nil {
489 t.Error("error reading partition", err)
490 return
491 }
492
493 if len(msgs) != 4 {
494 t.Error("bad messages in partition", msgs)
495 return
496 }
497
498 for i, m := range msgs {
499 if string(m.Value) == "M"+strconv.Itoa(i) {
500 continue
501 }
502 t.Error("bad messages in partition", string(m.Value))
503 }
504}
505
506func testWriterBatchSize(t *testing.T) {
507 topic := makeTopic()

Callers

nothing calls this directly

Calls 10

createTopicFunction · 0.85
deleteTopicFunction · 0.85
readOffsetFunction · 0.85
newTestWriterFunction · 0.85
readPartitionFunction · 0.85
makeTopicFunction · 0.70
CloseMethod · 0.45
WriteMessagesMethod · 0.45
ErrorMethod · 0.45
StatsMethod · 0.45

Tested by

no test coverage detected