MCPcopy
hub / github.com/segmentio/kafka-go / testWriterBatchSize

Function testWriterBatchSize

writer_test.go:506–555  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

504}
505
506func testWriterBatchSize(t *testing.T) {
507 topic := makeTopic()
508 createTopic(t, topic, 1)
509 defer deleteTopic(t, topic)
510
511 offset, err := readOffset(topic, 0)
512 if err != nil {
513 t.Fatal(err)
514 }
515
516 w := newTestWriter(WriterConfig{
517 Topic: topic,
518 BatchSize: 2,
519 BatchTimeout: math.MaxInt32 * time.Second,
520 Balancer: &RoundRobin{},
521 })
522 defer w.Close()
523
524 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
525 defer cancel()
526 if err := w.WriteMessages(ctx, []Message{
527 {Value: []byte("Hi")}, // 24 Bytes
528 {Value: []byte("By")}, // 24 Bytes
529 }...); err != nil {
530 t.Error(err)
531 return
532 }
533
534 if w.Stats().Writes > 1 {
535 t.Error("didn't batch messages")
536 return
537 }
538 msgs, err := readPartition(topic, 0, offset)
539 if err != nil {
540 t.Error("error reading partition", err)
541 return
542 }
543
544 if len(msgs) != 2 {
545 t.Error("bad messages in partition", msgs)
546 return
547 }
548
549 for _, m := range msgs {
550 if string(m.Value) == "Hi" || string(m.Value) == "By" {
551 continue
552 }
553 t.Error("bad messages in partition", msgs)
554 }
555}
556
557func testWriterSmallBatchBytes(t *testing.T) {
558 topic := makeTopic()

Callers

nothing calls this directly

Calls 10

createTopicFunction · 0.85
deleteTopicFunction · 0.85
readOffsetFunction · 0.85
newTestWriterFunction · 0.85
readPartitionFunction · 0.85
makeTopicFunction · 0.70
CloseMethod · 0.45
WriteMessagesMethod · 0.45
ErrorMethod · 0.45
StatsMethod · 0.45

Tested by

no test coverage detected