MCPcopy
hub / github.com/segmentio/kafka-go / testWriterSmallBatchBytes

Function testWriterSmallBatchBytes

writer_test.go:557–606  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

555}
556
557func testWriterSmallBatchBytes(t *testing.T) {
558 topic := makeTopic()
559 createTopic(t, topic, 1)
560 defer deleteTopic(t, topic)
561
562 offset, err := readOffset(topic, 0)
563 if err != nil {
564 t.Fatal(err)
565 }
566
567 w := newTestWriter(WriterConfig{
568 Topic: topic,
569 BatchBytes: 25,
570 BatchTimeout: 50 * time.Millisecond,
571 Balancer: &RoundRobin{},
572 })
573 defer w.Close()
574
575 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
576 defer cancel()
577 if err := w.WriteMessages(ctx, []Message{
578 {Value: []byte("Hi")}, // 24 Bytes
579 {Value: []byte("By")}, // 24 Bytes
580 }...); err != nil {
581 t.Error(err)
582 return
583 }
584 ws := w.Stats()
585 if ws.Writes != 2 {
586 t.Error("didn't batch messages; Writes: ", ws.Writes)
587 return
588 }
589 msgs, err := readPartition(topic, 0, offset)
590 if err != nil {
591 t.Error("error reading partition", err)
592 return
593 }
594
595 if len(msgs) != 2 {
596 t.Error("bad messages in partition", msgs)
597 return
598 }
599
600 for _, m := range msgs {
601 if string(m.Value) == "Hi" || string(m.Value) == "By" {
602 continue
603 }
604 t.Error("bad messages in partition", msgs)
605 }
606}
607
608func testWriterBatchBytesHeaders(t *testing.T) {
609 topic := makeTopic()

Callers

nothing calls this directly

Calls 10

createTopicFunction · 0.85
deleteTopicFunction · 0.85
readOffsetFunction · 0.85
newTestWriterFunction · 0.85
readPartitionFunction · 0.85
makeTopicFunction · 0.70
CloseMethod · 0.45
WriteMessagesMethod · 0.45
ErrorMethod · 0.45
StatsMethod · 0.45

Tested by

no test coverage detected